DBZ-2504: Add support for field and header prefix

Added support for field and header prefix.
Updated the documentation to include the options
 'add.fields.prefix' and 'add.headers.prefix'.
This commit is contained in:
Ganesh Ramasubramanian 2020-09-07 19:17:58 +02:00 committed by Gunnar Morling
parent 4ddb1d21c8
commit 6ca025b1bb
7 changed files with 88 additions and 35 deletions

View File

@ -63,6 +63,8 @@
*/
public class ExtractNewDocumentState<R extends ConnectRecord<R>> implements Transformation<R> {
private String addFieldsPrefix;
public enum ArrayEncoding implements EnumeratedValue {
ARRAY("array"),
DOCUMENT("document");
@ -301,7 +303,7 @@ private R newRecord(R record, BsonDocument keyDocument, BsonDocument valueDocume
}
if (addSourceFields != null) {
addSourceFieldsSchema(addSourceFields, record, valueSchemaBuilder);
addSourceFieldsSchema(addFieldsPrefix, addSourceFields, record, valueSchemaBuilder);
}
if (!additionalFields.isEmpty()) {
@ -342,13 +344,13 @@ private R newRecord(R record, BsonDocument keyDocument, BsonDocument valueDocume
return newRecord;
}
private void addSourceFieldsSchema(List<String> addSourceFields, R originalRecord, SchemaBuilder valueSchemaBuilder) {
private void addSourceFieldsSchema(String fieldPrefix, List<String> addSourceFields, R originalRecord, SchemaBuilder valueSchemaBuilder) {
Schema sourceSchema = originalRecord.valueSchema().field("source").schema();
for (String sourceField : addSourceFields) {
if (sourceSchema.field(sourceField) == null) {
throw new ConfigException("Source field specified in 'add.source.fields' does not exist: " + sourceField);
}
valueSchemaBuilder.field(ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + sourceField,
valueSchemaBuilder.field(fieldPrefix + sourceField,
sourceSchema.field(sourceField).schema());
}
}
@ -489,8 +491,10 @@ public void configure(final Map<String, ?> map) {
addSourceFields = determineAdditionalSourceField(config.getString(ADD_SOURCE_FIELDS));
additionalHeaders = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
additionalFields = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
addFieldsPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX);
String addHeadersPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX);
additionalHeaders = FieldReference.fromConfiguration(addHeadersPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
additionalFields = FieldReference.fromConfiguration(addFieldsPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER);
@ -541,13 +545,13 @@ private static class FieldReference {
*/
private final String newFieldName;
private FieldReference(String field) {
private FieldReference(String prefix, String field) {
String[] parts = FIELD_SEPARATOR.split(field);
if (parts.length == 1) {
this.struct = determineStruct(parts[0]);
this.field = parts[0];
this.newFieldName = ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + field;
this.newFieldName = prefix + field;
}
else if (parts.length == 2) {
this.struct = parts[0];
@ -557,7 +561,7 @@ else if (parts.length == 2) {
}
this.field = parts[1];
this.newFieldName = ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + this.struct + "_" + this.field;
this.newFieldName = prefix + this.struct + "_" + this.field;
}
else {
throw new IllegalArgumentException("Unexpected field value: " + field);
@ -583,14 +587,14 @@ else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY)
}
}
static List<FieldReference> fromConfiguration(String addHeadersConfig) {
static List<FieldReference> fromConfiguration(String fieldPrefix, String addHeadersConfig) {
if (Strings.isNullOrEmpty(addHeadersConfig)) {
return Collections.emptyList();
}
else {
return Arrays.stream(addHeadersConfig.split(","))
.map(String::trim)
.map(FieldReference::new)
.map(field -> new FieldReference(fieldPrefix, field))
.collect(Collectors.toList());
}
}

View File

@ -57,6 +57,8 @@ public class ExtractNewDocumentStateTestIT extends AbstractExtractNewDocumentSta
private static final String ADD_SOURCE_FIELDS = "add.source.fields";
private static final String ADD_HEADERS = "add.headers";
private static final String ADD_FIELDS = "add.fields";
private static final String ADD_FIELDS_PREFIX = ADD_FIELDS + ".prefix";
private static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix";
@Override
protected String getCollectionName() {
@ -1232,20 +1234,21 @@ public void testAddHeadersForMissingOrInvalidFields() throws Exception {
}
@Test
@FixFor("DBZ-1791")
@FixFor({ "DBZ-1791", "DBZ-2504" })
public void testAddHeadersSpecifyingStruct() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,source.rs,source.collection");
props.put(ADD_HEADERS_PREFIX, "prefix.");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(transformed.headers()).hasSize(3);
assertThat(getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo(Operation.CREATE.code());
assertThat(getSourceRecordHeaderByKey(transformed, "__source_rs")).isEqualTo("rs0");
assertThat(getSourceRecordHeaderByKey(transformed, "__source_collection")).isEqualTo(getCollectionName());
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.op")).isEqualTo(Operation.CREATE.code());
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.source_rs")).isEqualTo("rs0");
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.source_collection")).isEqualTo(getCollectionName());
}
@Test
@ -1263,18 +1266,19 @@ public void testAddField() throws Exception {
}
@Test
@FixFor("DBZ-1791")
@FixFor({ "DBZ-1791", "DBZ-2504" })
public void testAddFields() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op , ts_ms");
props.put(ADD_FIELDS_PREFIX, "prefix.");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Operation.CREATE.code());
assertThat(((Struct) transformed.value()).get("__ts_ms")).isNotNull();
assertThat(((Struct) transformed.value()).get("prefix.op")).isEqualTo(Operation.CREATE.code());
assertThat(((Struct) transformed.value()).get("prefix.ts_ms")).isNotNull();
}
@Test

View File

@ -93,8 +93,10 @@ public void configure(final Map<String, ?> configs) {
dropTombstones = config.getBoolean(ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(ExtractNewRecordStateConfigDefinition.HANDLE_DELETES));
additionalFields = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
additionalHeaders = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
String addFieldsPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX);
String addHeadersPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX);
additionalFields = FieldReference.fromConfiguration(addFieldsPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
additionalHeaders = FieldReference.fromConfiguration(addHeadersPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
String routeFieldConfig = config.getString(ExtractNewRecordStateConfigDefinition.ROUTE_BY_FIELD);
routeByField = routeFieldConfig.isEmpty() ? null : routeFieldConfig;
@ -307,17 +309,22 @@ private static class FieldReference {
private final String field;
/**
* The name for the outgoing attribute/field, e.g. "__op" or "__source_ts_ms".
* The prefix for the new field name.
*/
private final String prefix;
/**
* The name for the outgoing attribute/field, e.g. "__op" or "__source_ts_ms" when the prefix is "__"
*/
private final String newFieldName;
private FieldReference(String field) {
private FieldReference(String prefix, String field) {
this.prefix = prefix;
String[] parts = FIELD_SEPARATOR.split(field);
if (parts.length == 1) {
this.struct = determineStruct(parts[0]);
this.field = parts[0];
this.newFieldName = ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + field;
this.newFieldName = prefix + field;
}
else if (parts.length == 2) {
this.struct = parts[0];
@ -327,7 +334,7 @@ else if (parts.length == 2) {
}
this.field = parts[1];
this.newFieldName = ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + this.struct + "_" + this.field;
this.newFieldName = prefix + this.struct + "_" + this.field;
}
else {
throw new IllegalArgumentException("Unexpected field name: " + field);
@ -351,14 +358,14 @@ else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY)
}
}
static List<FieldReference> fromConfiguration(String addHeadersConfig) {
static List<FieldReference> fromConfiguration(String fieldPrefix, String addHeadersConfig) {
if (Strings.isNullOrEmpty(addHeadersConfig)) {
return Collections.emptyList();
}
else {
return Arrays.stream(addHeadersConfig.split(","))
.map(String::trim)
.map(FieldReference::new)
.map(field -> new FieldReference(fieldPrefix, field))
.collect(Collectors.toList());
}
}

View File

@ -94,6 +94,14 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withImportance(ConfigDef.Importance.LOW)
.withDefault("");
public static final Field ADD_FIELDS_PREFIX = Field.create("add.fields.prefix")
.withDisplayName("Field prefix to be added to each field.")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(METADATA_FIELD_PREFIX)
.withDescription("Adds this prefix to each field listed.");
public static final Field ADD_FIELDS = Field.create("add.fields")
.withDisplayName("Adds the specified field(s) to the message if they exist.")
.withType(ConfigDef.Type.LIST)
@ -103,6 +111,14 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withDescription("Adds each field listed, prefixed with __ (or __<struct>_ if the struct is specified) "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields");
public static final Field ADD_HEADERS_PREFIX = Field.create("add.headers.prefix")
.withDisplayName("Header prefix to be added to each header.")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(METADATA_FIELD_PREFIX)
.withDescription("Adds this prefix listed to each header.");
public static final Field ADD_HEADERS = Field.create("add.headers")
.withDisplayName("Adds the specified fields to the header if they exist.")
.withType(ConfigDef.Type.LIST)

View File

@ -33,6 +33,8 @@ public class ExtractNewRecordStateTest {
private static final String ROUTE_BY_FIELD = "route.by.field";
private static final String ADD_FIELDS = "add.fields";
private static final String ADD_HEADERS = "add.headers";
private static final String ADD_FIELDS_PREFIX = ADD_FIELDS + ".prefix";
private static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix";
final Schema recordSchema = SchemaBuilder.struct()
.field("id", SchemaBuilder.int8())
@ -318,18 +320,19 @@ public void testAddField() {
}
@Test
@FixFor("DBZ-1452")
@FixFor({ "DBZ-1452", "DBZ-2504" })
public void testAddFields() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op , lsn,id");
props.put(ADD_FIELDS_PREFIX, "prefix.");
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(((Struct) unwrapped.value()).get("__op")).isEqualTo(Envelope.Operation.UPDATE.code());
assertThat(((Struct) unwrapped.value()).get("__lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get("__id")).isEqualTo("571");
assertThat(((Struct) unwrapped.value()).get("prefix.op")).isEqualTo(Envelope.Operation.UPDATE.code());
assertThat(((Struct) unwrapped.value()).get("prefix.lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get("prefix.id")).isEqualTo("571");
}
}
@ -423,23 +426,24 @@ public void testAddHeadersForMissingOptionalField() {
}
@Test
@FixFor("DBZ-1452")
@FixFor({ "DBZ-1452", "DBZ-2504" })
public void testAddHeadersSpecifyStruct() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,source.lsn,transaction.id,transaction.total_order");
props.put(ADD_HEADERS_PREFIX, "prefix.");
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(unwrapped.headers()).hasSize(4);
String headerValue = getSourceRecordHeaderByKey(unwrapped, "__op");
String headerValue = getSourceRecordHeaderByKey(unwrapped, "prefix.op");
assertThat(headerValue).isEqualTo(Envelope.Operation.UPDATE.code());
headerValue = getSourceRecordHeaderByKey(unwrapped, "__source_lsn");
headerValue = getSourceRecordHeaderByKey(unwrapped, "prefix.source_lsn");
assertThat(headerValue).isEqualTo(String.valueOf(1234));
headerValue = getSourceRecordHeaderByKey(unwrapped, "__transaction_id");
headerValue = getSourceRecordHeaderByKey(unwrapped, "prefix.transaction_id");
assertThat(headerValue).isEqualTo(String.valueOf(571L));
headerValue = getSourceRecordHeaderByKey(unwrapped, "__transaction_total_order");
headerValue = getSourceRecordHeaderByKey(unwrapped, "prefix.transaction_total_order");
assertThat(headerValue).isEqualTo(String.valueOf(42L));
}
}

View File

@ -279,6 +279,11 @@ For example, configuration of `route.by.field=destination` routes records to the
+
If you are configuring the `ExtractNewRecordState` SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
[id="extract-new-record-state-add-fields-prefix"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-add-fields-prefix[`add.fields.prefix`]
| __ (double-underscore)
|Set this optional string to prefix a field.
[id="extract-new-record-state-add-fields"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-add-fields[`add.fields`]
|
@ -286,7 +291,12 @@ If you are configuring the `ExtractNewRecordState` SMT on a sink connector, sett
+
When the SMT adds metadata fields to the simplified record's value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT still adds the field to the record's value.
If you specify a field that is not in the change event record, the SMT still adds the field to the record's value.
[id="extract-new-record-state-add-headers-prefix"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-add-headers-prefix[`add.headers.prefix`]
| __ (double-underscore)
|Set this optional string to prefix a header.
[id="extract-new-record-state-add-headers"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-add-headers[`add.headers`]

View File

@ -271,6 +271,10 @@ For `DELETE` events, this option is only supported when the `delete.handling.mod
|`drop`
|The SMT can `drop`, `rewrite` or pass delete records (`none`). The `rewrite` mode will add a `__deleted` field set to `true` or `false` depending on the represented operation.
|[[mongodb-extract-new-record-state-add-headers-prefix]]<<mongodb-extract-new-record-state-add-headers-prefix, `add.headers.prefix`>>
|__ (double-underscore)
|Set this optional string to prefix a header.
|[[mongodb-extract-new-record-state-add-headers]]<<mongodb-extract-new-record-state-add-headers, `add.headers`>>
|
|Specify a list of metadata fields to add to header of the flattened message.
@ -278,6 +282,10 @@ In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should
The fields will be prefixed with `pass:[__]` or `pass:[__]<struct>pass:[__]`, depending on the specification of the struct.
Please use a comma separated list without spaces.
|[[mongodb-extract-new-record-state-add-fields-prefix]]<<mongodb-extract-new-record-state-add-fields-prefix, `add.fields.prefix`>>
|__ (double-underscore)
|Set this optional string to prefix a field.
|[[mongodb-extract-new-record-state-add-fields]]<<mongodb-extract-new-record-state-add-fields, `add.fields`>>
|
|Specify a list of metadata fields to add to the flattened message.