DBZ-2606: Map new name for the fields and headers

Added ability to map new name for the fields and headers.
The "new name" mapping is completely optional and we will not
be affecting the existing functionality.

The optional "new name" is case-sensitive.
This commit is contained in:
Ganesh Ramasubramanian 2020-10-05 09:34:14 +02:00 committed by Jiri Pechanec
parent fd6c8f9502
commit 4716194ff9
4 changed files with 60 additions and 23 deletions

View File

@ -67,6 +67,7 @@ public class ExtractNewRecordState<R extends ConnectRecord<R>> implements Transf
private static final String PURPOSE = "source field insertion"; private static final String PURPOSE = "source field insertion";
private static final int SCHEMA_CACHE_SIZE = 64; private static final int SCHEMA_CACHE_SIZE = 64;
private static final Pattern FIELD_SEPARATOR = Pattern.compile("\\."); private static final Pattern FIELD_SEPARATOR = Pattern.compile("\\.");
private static final Pattern NEW_FIELD_SEPARATOR = Pattern.compile(":");
private boolean dropTombstones; private boolean dropTombstones;
private DeleteHandling handleDeletes; private DeleteHandling handleDeletes;
@ -212,11 +213,11 @@ private Headers makeHeaders(List<FieldReference> additionalHeaders, Struct origi
// add "d" operation header to tombstone events // add "d" operation header to tombstone events
if (originalRecordValue == null) { if (originalRecordValue == null) {
if (FieldName.OPERATION.equals(fieldReference.field)) { if (FieldName.OPERATION.equals(fieldReference.field)) {
headers.addString(fieldReference.newFieldName, Operation.DELETE.code()); headers.addString(fieldReference.getNewField(), Operation.DELETE.code());
} }
continue; continue;
} }
headers.add(fieldReference.getNewFieldName(), fieldReference.getValue(originalRecordValue), headers.add(fieldReference.getNewField(), fieldReference.getValue(originalRecordValue),
fieldReference.getSchema(originalRecordValue.schema())); fieldReference.getSchema(originalRecordValue.schema()));
} }
@ -267,11 +268,11 @@ private Schema makeUpdatedSchema(List<FieldReference> additionalFields, Schema s
} }
private SchemaBuilder updateSchema(FieldReference fieldReference, SchemaBuilder builder, Schema originalRecordSchema) { private SchemaBuilder updateSchema(FieldReference fieldReference, SchemaBuilder builder, Schema originalRecordSchema) {
return builder.field(fieldReference.getNewFieldName(), fieldReference.getSchema(originalRecordSchema)); return builder.field(fieldReference.getNewField(), fieldReference.getSchema(originalRecordSchema));
} }
private Struct updateValue(FieldReference fieldReference, Struct updatedValue, Struct struct) { private Struct updateValue(FieldReference fieldReference, Struct updatedValue, Struct struct) {
return updatedValue.put(fieldReference.getNewFieldName(), fieldReference.getValue(struct)); return updatedValue.put(fieldReference.getNewField(), fieldReference.getValue(struct));
} }
@Override @Override
@ -312,29 +313,24 @@ private static class FieldReference {
* The prefix for the new field name. * The prefix for the new field name.
*/ */
private final String prefix; private final String prefix;
/** /**
* The name for the outgoing attribute/field, e.g. "__op" or "__source_ts_ms" when the prefix is "__" * The name for the outgoing attribute/field, e.g. "__op" or "__source_ts_ms" when the prefix is "__"
*/ */
private final String newFieldName; private final String newField;
private FieldReference(String prefix, String field) { private FieldReference(String prefix, String field) {
this.prefix = prefix; this.prefix = prefix;
String[] parts = FIELD_SEPARATOR.split(field); String[] parts = NEW_FIELD_SEPARATOR.split(field);
String[] splits = FIELD_SEPARATOR.split(parts[0]);
this.field = splits.length == 1 ? splits[0] : splits[1];
this.struct = determineStruct(this.field);
if (parts.length == 1) { if (parts.length == 1) {
this.struct = determineStruct(parts[0]); this.newField = prefix + (splits.length == 1 ? this.field : this.struct + "_" + this.field);
this.field = parts[0];
this.newFieldName = prefix + field;
} }
else if (parts.length == 2) { else if (parts.length == 2) {
this.struct = parts[0]; this.newField = prefix + parts[1];
if (!(this.struct.equals(Envelope.FieldName.SOURCE) || this.struct.equals(Envelope.FieldName.TRANSACTION))) {
throw new IllegalArgumentException("Unexpected field name: " + field);
}
this.field = parts[1];
this.newFieldName = prefix + this.struct + "_" + this.field;
} }
else { else {
throw new IllegalArgumentException("Unexpected field name: " + field); throw new IllegalArgumentException("Unexpected field name: " + field);
@ -370,8 +366,8 @@ static List<FieldReference> fromConfiguration(String fieldPrefix, String addHead
} }
} }
String getNewFieldName() { public String getNewField() {
return newFieldName; return this.newField;
} }
Object getValue(Struct originalRecordValue) { Object getValue(Struct originalRecordValue) {

View File

@ -108,8 +108,10 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withWidth(ConfigDef.Width.LONG) .withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW) .withImportance(ConfigDef.Importance.LOW)
.withDefault("") .withDefault("")
.withDescription("Adds each field listed, prefixed with __ (or __<struct>_ if the struct is specified) " .withDescription("Adds each field listed, prefixed with __ (or __<struct>_ if the struct is specified). "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields"); + "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields. "
+ "Optionally one can also map new field name like version:VERSION,connector:CONNECTOR,source.ts_ms:EVENT_TIMESTAMP."
+ "Please note that the new field name is case-sensitive.");
public static final Field ADD_HEADERS_PREFIX = Field.create("add.headers.prefix") public static final Field ADD_HEADERS_PREFIX = Field.create("add.headers.prefix")
.withDisplayName("Header prefix to be added to each header.") .withDisplayName("Header prefix to be added to each header.")
@ -125,7 +127,9 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withWidth(ConfigDef.Width.LONG) .withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW) .withImportance(ConfigDef.Importance.LOW)
.withDefault("") .withDefault("")
.withDescription("Adds each field listed to the header, __ (or __<struct>_ if the struct is specified) " .withDescription("Adds each field listed to the header, __ (or __<struct>_ if the struct is specified). "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields"); + "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields. "
+ "Optionally one can also map new field name like version:VERSION,connector:CONNECTOR,source.ts_ms:EVENT_TIMESTAMP."
+ "Please note that the new field name is case-sensitive.");
} }

View File

@ -336,6 +336,39 @@ public void testAddFields() {
} }
} }
@Test
@FixFor({ "DBZ-2606" })
public void testNewFieldAndHeaderMapping() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
String fieldPrefix = "";
String headerPrefix = "prefix.";
props.put(ADD_FIELDS, "op:OP, lsn:LSN, id:ID, source.lsn:source_lsn, transaction.total_order:TOTAL_ORDER");
props.put(ADD_FIELDS_PREFIX, fieldPrefix);
props.put(ADD_HEADERS, "op, source.lsn:source_lsn, transaction.id:TXN_ID, transaction.total_order:TOTAL_ORDER");
props.put(ADD_HEADERS_PREFIX, headerPrefix);
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(((Struct) unwrapped.value()).get(fieldPrefix + "OP")).isEqualTo(Envelope.Operation.UPDATE.code());
assertThat(((Struct) unwrapped.value()).get(fieldPrefix + "LSN")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get(fieldPrefix + "ID")).isEqualTo("571");
assertThat(((Struct) unwrapped.value()).get(fieldPrefix + "source_lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get(fieldPrefix + "TOTAL_ORDER")).isEqualTo(42L);
assertThat(unwrapped.headers()).hasSize(4);
String headerValue = getSourceRecordHeaderByKey(unwrapped, headerPrefix + "op");
assertThat(headerValue).isEqualTo(Envelope.Operation.UPDATE.code());
headerValue = getSourceRecordHeaderByKey(unwrapped, headerPrefix + "source_lsn");
assertThat(headerValue).isEqualTo(String.valueOf(1234));
headerValue = getSourceRecordHeaderByKey(unwrapped, headerPrefix + "TXN_ID");
assertThat(headerValue).isEqualTo(String.valueOf(571L));
headerValue = getSourceRecordHeaderByKey(unwrapped, headerPrefix + "TOTAL_ORDER");
assertThat(headerValue).isEqualTo(String.valueOf(42L));
}
}
@Test @Test
@FixFor("DBZ-1452") @FixFor("DBZ-1452")
public void testAddFieldsForMissingOptionalField() { public void testAddFieldsForMissingOptionalField() {

View File

@ -266,6 +266,8 @@ If you are configuring the event flattening SMT on a sink connector, setting thi
| |
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record's value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. + |Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record's value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+ +
Optionally, you can override the field name via `<field name>:<new field name>`, e.g. like so: new field name like `version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP`. Please note that the `new field name` is case-sensitive. +
+
When the SMT adds metadata fields to the simplified record's value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. + When the SMT adds metadata fields to the simplified record's value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+ +
If you specify a field that is not in the change event record, the SMT still adds the field to the record's value. If you specify a field that is not in the change event record, the SMT still adds the field to the record's value.
@ -280,6 +282,8 @@ If you specify a field that is not in the change event record, the SMT still add
| |
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. + |Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+ +
Optionally, you can override the field name via `<field name>:<new field name>`, e.g. like so: new field name like `version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP`. Please note that the `new field name` is case-sensitive. +
+
When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. + When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+ +
If you specify a field that is not in the change event record, the SMT does not add the field to the header. If you specify a field that is not in the change event record, the SMT does not add the field to the header.