DBZ-5283 Introduce a new SMT ExtractChangedRecordState to capture the names of changed/unchanged field within headers

This commit is contained in:
harveyyue 2022-10-20 17:21:04 +08:00 committed by Jiri Pechanec
parent f3417f383f
commit 4cb6e7c5b3
6 changed files with 192 additions and 10 deletions

View File

@ -100,7 +100,6 @@ public double toDouble() {
/** /**
* Converts a value from its logical format (BigDecimal/special) to its string representation * Converts a value from its logical format (BigDecimal/special) to its string representation
* *
* @param struct the strut to put data in
* @return the encoded value * @return the encoded value
*/ */
@Override @Override
@ -144,7 +143,7 @@ else if (!decimalValue.equals(other.decimalValue)) {
} }
/** /**
* Returns a {@link SchemaBuilder} for a decimal number depending on {@link JdbcValueConverters.DecimalMode}. You * Returns a {@link SchemaBuilder} for a decimal number depending on {@link DecimalMode}. You
* can use the resulting schema builder to set additional schema settings such as required/optional, default value, * can use the resulting schema builder to set additional schema settings such as required/optional, default value,
* and documentation. * and documentation.
* *

View File

@ -64,7 +64,7 @@ public static Schema optionalSchema() {
* the scale of the number and a binary representation of the number. * the scale of the number and a binary representation of the number.
* *
* @param schema of the encoded value * @param schema of the encoded value
* @param decimalValue the value or the decimal * @param value the value or the decimal
* *
* @return the encoded value * @return the encoded value
*/ */

View File

@ -96,7 +96,7 @@ public static double[] parseWKBPoint(byte[] wkb) throws IllegalArgumentException
/** /**
* Creates a value for this schema using 2 given coordinates. * Creates a value for this schema using 2 given coordinates.
* *
* @param pointSchema a {@link Schema} instance which represents a point; may not be null * @param geomSchema a {@link Schema} instance which represents a point; may not be null
* @param x the X coordinate of the point; may not be null * @param x the X coordinate of the point; may not be null
* @param y the Y coordinate of the point; may not be null * @param y the Y coordinate of the point; may not be null
* @return a {@link Struct} which represents a Connect value for this schema; never null * @return a {@link Struct} which represents a Connect value for this schema; never null
@ -112,7 +112,7 @@ public static Struct createValue(Schema geomSchema, double x, double y) {
/** /**
* Create a value for this schema using WKB * Create a value for this schema using WKB
* @param pointSchema a {@link Schema} instance which represents a point; may not be null * @param geomSchema a {@link Schema} instance which represents a point; may not be null
* @param wkb the original Well-Known binary representation of the coordinate; may not be null * @param wkb the original Well-Known binary representation of the coordinate; may not be null
* @param srid the coordinate reference system identifier; null if unset/unknown * @param srid the coordinate reference system identifier; null if unset/unknown
* @return a {@link Struct} which represents a Connect value for this schema; never null * @return a {@link Struct} which represents a Connect value for this schema; never null

View File

@ -0,0 +1,116 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.Strings;
/**
* This SMT to extract the changed and unchanged field names to Connect Headers comparing before and after value.
* It only works on update event.
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Harvey Yue
*/
public class ExtractChangedRecordState<R extends ConnectRecord<R>> implements Transformation<R> {
public static final Field HEADER_CHANGED_NAME = Field.create("header.changed.name")
.withDisplayName("Header change name.")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Specify the header changed name, default is null which means not send changes to header.");
public static final Field HEADER_UNCHANGED_NAME = Field.create("header.unchanged.name")
.withDisplayName("Header unchanged name.")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Specify the header unchanged name of schema, default is null which means not send changes to header.");
private String headerChangedName = null;
private String headerUnchangedName = null;
private Schema changedSchema;
private Schema unchangedSchema;
private SmtManager<R> smtManager;
@Override
public void configure(final Map<String, ?> configs) {
final Configuration config = Configuration.from(configs);
smtManager = new SmtManager<>(config);
if (config.getString(HEADER_CHANGED_NAME) != null) {
headerChangedName = config.getString(HEADER_CHANGED_NAME);
changedSchema = SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().name(headerChangedName).build();
}
if (config.getString(HEADER_UNCHANGED_NAME) != null) {
headerUnchangedName = config.getString(HEADER_UNCHANGED_NAME);
unchangedSchema = SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().name(headerUnchangedName).build();
}
}
@Override
public R apply(final R record) {
if (record.value() == null) {
return null;
}
if (!smtManager.isValidEnvelope(record)) {
return record;
}
Struct value = requireStruct(record.value(), "Record value should be struct.");
Object after = value.get("after");
Object before = value.get("before");
if (after != null && before != null) {
List<String> changedNames = new ArrayList<>();
List<String> unchangedNames = new ArrayList<>();
Struct afterValue = requireStruct(after, "After value should be struct.");
Struct beforeValue = requireStruct(before, "Before value should be struct.");
afterValue.schema().fields().forEach(field -> {
if (!Objects.equals(afterValue.get(field), beforeValue.get(field))) {
changedNames.add(field.name());
}
else {
unchangedNames.add(field.name());
}
});
if (!Strings.isNullOrBlank(headerChangedName)) {
record.headers().add(headerChangedName, changedNames, changedSchema);
}
if (!Strings.isNullOrBlank(headerUnchangedName)) {
record.headers().add(headerUnchangedName, unchangedNames, unchangedSchema);
}
}
return record;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, HEADER_CHANGED_NAME, HEADER_UNCHANGED_NAME);
return config;
}
}

View File

@ -74,10 +74,10 @@ public class ExtractNewRecordState<R extends ConnectRecord<R>> implements Transf
private List<FieldReference> additionalHeaders; private List<FieldReference> additionalHeaders;
private List<FieldReference> additionalFields; private List<FieldReference> additionalFields;
private String routeByField; private String routeByField;
private final ExtractField<R> afterDelegate = new ExtractField.Value<R>(); private final ExtractField<R> afterDelegate = new ExtractField.Value<>();
private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>(); private final ExtractField<R> beforeDelegate = new ExtractField.Value<>();
private final InsertField<R> removedDelegate = new InsertField.Value<R>(); private final InsertField<R> removedDelegate = new InsertField.Value<>();
private final InsertField<R> updatedDelegate = new InsertField.Value<R>(); private final InsertField<R> updatedDelegate = new InsertField.Value<>();
private BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache; private BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache;
private SmtManager<R> smtManager; private SmtManager<R> smtManager;

View File

@ -8,8 +8,10 @@
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -226,6 +228,14 @@ private SourceRecord createUnknownUnnamedSchemaRecord() {
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before); return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before);
} }
private Header getSourceRecordHeader(SourceRecord record, String headerKey) {
Iterator<Header> operationHeader = record.headers().allWithName(headerKey);
if (!operationHeader.hasNext()) {
return null;
}
return operationHeader.next();
}
private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) { private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) {
Iterator<Header> operationHeader = record.headers().allWithName(headerKey); Iterator<Header> operationHeader = record.headers().allWithName(headerKey);
if (!operationHeader.hasNext()) { if (!operationHeader.hasNext()) {
@ -233,7 +243,6 @@ private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey)
} }
Object value = operationHeader.next().value(); Object value = operationHeader.next().value();
return value != null ? value.toString() : null; return value != null ? value.toString() : null;
} }
@ -726,4 +735,62 @@ public void testSchemaChangeEventWithOperationHeader() {
assertThat(transform.apply(unnamedSchemaRecord)).isEqualTo(unnamedSchemaRecord); assertThat(transform.apply(unnamedSchemaRecord)).isEqualTo(unnamedSchemaRecord);
} }
} }
@Test
@FixFor("DBZ-5283")
public void testAddUpdatedFieldToHeaders() {
try (final ExtractChangedRecordState<SourceRecord> transform = new ExtractChangedRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put("header.changed.name", "Changed");
props.put("header.unchanged.name", "Unchanged");
transform.configure(props);
final SourceRecord updatedRecord = createUpdateRecord();
final SourceRecord transformRecord = transform.apply(updatedRecord);
final Header changedHeader = getSourceRecordHeader(transformRecord, "Changed");
final List<String> changedHeaderValues = (List<String>) changedHeader.value();
final Header unchangedHeader = getSourceRecordHeader(transformRecord, "Unchanged");
final List<String> unchangedHeaderValues = (ArrayList<String>) unchangedHeader.value();
assertThat(transformRecord.headers().size()).isEqualTo(2);
assertThat(changedHeaderValues.size()).isEqualTo(1);
assertThat(changedHeaderValues.get(0)).isEqualTo("name");
assertThat(changedHeader.schema().name()).isEqualTo("Changed");
assertThat(unchangedHeaderValues.size()).isEqualTo(1);
assertThat(unchangedHeaderValues.get(0)).isEqualTo("id");
}
// Not set unchanged header
try (final ExtractChangedRecordState<SourceRecord> transform = new ExtractChangedRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put("header.changed.name", "Changed");
transform.configure(props);
final SourceRecord updatedRecord = createUpdateRecord();
final SourceRecord transformRecord = transform.apply(updatedRecord);
final Header changedHeader = getSourceRecordHeader(transformRecord, "Changed");
final List<String> changedHeaderValues = (List<String>) changedHeader.value();
final Header unchangedHeader = getSourceRecordHeader(transformRecord, "Unchanged");
assertThat(transformRecord.headers().size()).isEqualTo(1);
assertThat(changedHeaderValues.contains("name")).isTrue();
assertThat(unchangedHeader).isNull();
}
}
@Test
@FixFor("DBZ-5283")
public void testAddCreatedFieldToHeaders() {
try (final ExtractChangedRecordState<SourceRecord> transform = new ExtractChangedRecordState<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
props.put("header.changed.name", "Changed");
props.put("header.unchanged.name", "Unchanged");
transform.configure(props);
final SourceRecord createdRecord = createCreateRecord();
final SourceRecord transformRecord = transform.apply(createdRecord);
assertThat(transformRecord.headers()).isEmpty();
}
}
} }