DBZ-971 Mongo Unwrap can add data operation header

This commit is contained in:
Renato Mefi 2018-11-13 12:01:26 +01:00 committed by Gunnar Morling
parent 4b0a84f587
commit de7c065b43
2 changed files with 59 additions and 4 deletions

View File

@ -39,6 +39,8 @@
*/
public class UnwrapFromMongoDbEnvelope<R extends ConnectRecord<R>> implements Transformation<R> {
final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
public static enum ArrayEncoding implements EnumeratedValue {
ARRAY("array"),
DOCUMENT("document");
@ -111,6 +113,15 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Delimiter to concat between field names from the input record when generating field names for the"
+ "output record.");
private static final Field OPERATION_HEADER = Field.create("operation.header")
.withDisplayName("Adds a message header representing the applied operation")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + DEBEZIUM_OPERATION_HEADER_KEY +"'");
private static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
@ -128,6 +139,7 @@ public static ArrayEncoding parse(String value, String defaultValue) {
private MongoDataConverter converter;
private final Flatten<R> recordFlattener = new Flatten.Value<R>();
private boolean addOperationHeader;
private boolean flattenStruct;
private String delimiter;
@ -158,6 +170,10 @@ private R getTombstoneRecord(R r) {
@Override
public R apply(R r) {
if (addOperationHeader) {
r.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, ((Struct) r.value()).get("op").toString());
}
// Tombstone message
if (r.value() == null) {
if (dropTombstones) {
@ -281,9 +297,8 @@ public R apply(R r) {
}
if (flattenStruct) {
final R flattenRecord = recordFlattener.apply(r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema,
return recordFlattener.apply(r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema,
finalKeyStruct, finalValueSchema, finalValueStruct, r.timestamp(), r.headers()));
return flattenRecord;
}
else {
if (finalValueSchema.fields().isEmpty()) {
@ -311,7 +326,7 @@ public void close() {
@Override
public void configure(final Map<String, ?> map) {
final Configuration config = Configuration.from(map);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, DROP_TOMBSTONES);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, OPERATION_HEADER, DROP_TOMBSTONES);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
@ -319,6 +334,8 @@ public void configure(final Map<String, ?> map) {
converter = new MongoDataConverter(ArrayEncoding.parse(config.getString(ARRAY_ENCODING)));
addOperationHeader = config.getBoolean(OPERATION_HEADER);
flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER);

View File

@ -7,8 +7,15 @@
import static org.fest.assertions.Assertions.assertThat;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -44,6 +51,7 @@ public class UnwrapFromMongoDbEnvelopeTest {
private static final String SERVER_NAME = "serverX";
private static final String FLATTEN_STRUCT = "flatten.struct";
private static final String DELIMITER = "flatten.struct.delimiter";
private static final String OPERATION_HEADER = "operation.header";
private Filters filters;
private SourceInfo source;
@ -92,9 +100,19 @@ public void shouldTransformRecordForInsertEvent() throws InterruptedException {
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
final Map<String, String> props = new HashMap<>();
props.put(OPERATION_HEADER, "true");
transformation.configure(props);
// when
SourceRecord transformed = transformation.apply(record);
// then assert operation header is insert
Iterator<Header> operationHeader = transformed.headers().allWithName(transformation.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.CREATE.code());
// acquire key and value Structs
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
@ -182,9 +200,19 @@ public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
final Map<String, String> props = new HashMap<>();
props.put(OPERATION_HEADER, "true");
transformation.configure(props);
// when
SourceRecord transformed = transformation.apply(record);
// then assert operation header is update
Iterator<Header> operationHeader = transformed.headers().allWithName(transformation.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.UPDATE.code());
// acquire key and value Structs
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
@ -328,9 +356,19 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
SourceRecord record = produced.get(0);
final Map<String, String> props = new HashMap<>();
props.put(OPERATION_HEADER, "true");
transformation.configure(props);
// when
SourceRecord transformed = transformation.apply(record);
// then assert operation header is delete
Iterator<Header> operationHeader = transformed.headers().allWithName(transformation.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
// acquire key and value Structs
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();