diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/Transformations.java b/debezium-embedded/src/main/java/io/debezium/embedded/Transformations.java index ffc2311c2..ebe032af6 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/Transformations.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/Transformations.java @@ -97,9 +97,10 @@ record = t.apply(record); return record; } - private static Transformation createPredicateTransformation( - boolean negate, Predicate predicate, + private static Transformation createPredicateTransformation(boolean negate, + Predicate predicate, Transformation transformation) { + return new Transformation<>() { @Override public SourceRecord apply(SourceRecord sourceRecord) { @@ -127,7 +128,6 @@ public void close() { @Override public void configure(Map map) { - } }; } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index 3e1b69d8b..a0576ae6a 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -84,7 +84,9 @@ public void configure(Map configs) { @Override public SourceRecord apply(SourceRecord record) { - return ((String) record.value()).equals("Generated line number 1") ? null : record; + final String payload = (String) record.value(); + return payload.equals("Generated line number 1") || payload.equals("Generated line number 2") ? null + : record; } @Override diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/TransformationsTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/TransformationsTest.java index b58d7109e..fe137dc2e 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/TransformationsTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/TransformationsTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.io.IOException; import java.util.Collections; @@ -46,7 +47,6 @@ public void test() throws IOException { properties.setProperty("transforms.b.value.literal", "b"); properties.setProperty("transforms.b.predicate", "hasheader"); properties.setProperty("transforms.b.negate", "true"); - properties.setProperty("transforms.b.predicate", "hasheader"); Configuration configuration = Configuration.from(properties); SourceRecord updated; @@ -60,6 +60,12 @@ public void test() throws IOException { assertNotNull(updated.headers().lastWithName("h1")); assertEquals("a", updated.headers().lastWithName("h1").value()); + // record does not have header, transformation not applied + a = transformations.getTransformation("a"); + updated = a.apply(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "t1", 1, null, "key", null, "value", System.currentTimeMillis(), + new ConnectHeaders())); + assertNull(updated.headers().lastWithName("h1")); + // record does not have header, but transformation will still apply due to negation Transformation b = transformations.getTransformation("b"); updated = b.apply(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "t1", 1, null, "key", null, "value", System.currentTimeMillis(),