DBZ-2984 Guess struct name only when unavailable

This commit is contained in:
Jiri Pechanec 2021-02-16 10:06:42 +01:00 committed by Gunnar Morling
parent ad34b64485
commit 8c0b87c667
2 changed files with 28 additions and 4 deletions

View File

@ -324,7 +324,7 @@ private FieldReference(String prefix, String field) {
String[] parts = NEW_FIELD_SEPARATOR.split(field);
String[] splits = FIELD_SEPARATOR.split(parts[0]);
this.field = splits.length == 1 ? splits[0] : splits[1];
this.struct = determineStruct(this.field);
this.struct = (splits.length == 1) ? determineStruct(this.field) : splits[0];
if (parts.length == 1) {
this.newField = prefix + (splits.length == 1 ? this.field : this.struct + "_" + this.field);

View File

@ -37,12 +37,13 @@ public class ExtractNewRecordStateTest {
private static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix";
final Schema recordSchema = SchemaBuilder.struct()
.field("id", SchemaBuilder.int8())
.field("name", SchemaBuilder.string())
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build();
final Schema sourceSchema = SchemaBuilder.struct()
.field("lsn", SchemaBuilder.int32())
.field("lsn", Schema.INT32_SCHEMA)
.field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA)
.build();
final Envelope envelope = Envelope.defineSchema()
@ -120,6 +121,7 @@ private SourceRecord createCreateRecord() {
before.put("id", (byte) 1);
before.put("name", "myRecord");
source.put("lsn", 1234);
source.put("ts_ms", 12836);
final Struct payload = envelope.create(before, source, Instant.now());
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload);
}
@ -319,6 +321,28 @@ public void testAddField() {
}
}
@Test
@FixFor("DBZ-2984")
public void testAddTimestamp() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props1 = new HashMap<>();
props1.put(ADD_FIELDS, "ts_ms");
transform.configure(props1);
final SourceRecord createRecord1 = createCreateRecord();
final SourceRecord unwrapped1 = transform.apply(createRecord1);
assertThat(((Struct) unwrapped1.value()).get("__ts_ms")).isNotNull();
final Map<String, String> props2 = new HashMap<>();
props2.put(ADD_FIELDS, "source.ts_ms");
transform.configure(props2);
final SourceRecord createRecord2 = createCreateRecord();
final SourceRecord unwrapped2 = transform.apply(createRecord2);
assertThat(((Struct) unwrapped2.value()).get("__source_ts_ms")).isNotNull();
}
}
@Test
@FixFor({ "DBZ-1452", "DBZ-2504" })
public void testAddFields() {