From 8c0b87c6670518dc1a990158ba003667e692c8e0 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 16 Feb 2021 10:06:42 +0100 Subject: [PATCH] DBZ-2984 Guess struct name only when unavailable --- .../transforms/ExtractNewRecordState.java | 2 +- .../transforms/ExtractNewRecordStateTest.java | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java b/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java index fc063c5f2..52236263c 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java @@ -324,7 +324,7 @@ private FieldReference(String prefix, String field) { String[] parts = NEW_FIELD_SEPARATOR.split(field); String[] splits = FIELD_SEPARATOR.split(parts[0]); this.field = splits.length == 1 ? splits[0] : splits[1]; - this.struct = determineStruct(this.field); + this.struct = (splits.length == 1) ? determineStruct(this.field) : splits[0]; if (parts.length == 1) { this.newField = prefix + (splits.length == 1 ? this.field : this.struct + "_" + this.field); diff --git a/debezium-core/src/test/java/io/debezium/transforms/ExtractNewRecordStateTest.java b/debezium-core/src/test/java/io/debezium/transforms/ExtractNewRecordStateTest.java index ed3b2940c..7295607df 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/ExtractNewRecordStateTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/ExtractNewRecordStateTest.java @@ -37,12 +37,13 @@ public class ExtractNewRecordStateTest { private static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix"; final Schema recordSchema = SchemaBuilder.struct() - .field("id", SchemaBuilder.int8()) - .field("name", SchemaBuilder.string()) + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) .build(); final Schema sourceSchema = SchemaBuilder.struct() - .field("lsn", SchemaBuilder.int32()) + .field("lsn", Schema.INT32_SCHEMA) + .field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA) .build(); final Envelope envelope = Envelope.defineSchema() @@ -120,6 +121,7 @@ private SourceRecord createCreateRecord() { before.put("id", (byte) 1); before.put("name", "myRecord"); source.put("lsn", 1234); + source.put("ts_ms", 12836); final Struct payload = envelope.create(before, source, Instant.now()); return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload); } @@ -319,6 +321,28 @@ public void testAddField() { } } + @Test + @FixFor("DBZ-2984") + public void testAddTimestamp() { + try (final ExtractNewRecordState transform = new ExtractNewRecordState<>()) { + final Map props1 = new HashMap<>(); + props1.put(ADD_FIELDS, "ts_ms"); + transform.configure(props1); + + final SourceRecord createRecord1 = createCreateRecord(); + final SourceRecord unwrapped1 = transform.apply(createRecord1); + assertThat(((Struct) unwrapped1.value()).get("__ts_ms")).isNotNull(); + + final Map props2 = new HashMap<>(); + props2.put(ADD_FIELDS, "source.ts_ms"); + transform.configure(props2); + + final SourceRecord createRecord2 = createCreateRecord(); + final SourceRecord unwrapped2 = transform.apply(createRecord2); + assertThat(((Struct) unwrapped2.value()).get("__source_ts_ms")).isNotNull(); + } + } + @Test @FixFor({ "DBZ-1452", "DBZ-2504" }) public void testAddFields() {