DBZ-971 Unwrap can add data operation header

This commit is contained in:
Renato Mefi 2018-11-13 13:47:54 +01:00 committed by Gunnar Morling
parent 0af8dcfde0
commit 8c1ebc2a8f
2 changed files with 56 additions and 2 deletions

View File

@ -9,8 +9,10 @@
import java.util.Map; import java.util.Map;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.data.Envelope;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.InsertField; import org.apache.kafka.connect.transforms.InsertField;
@ -39,6 +41,8 @@
*/ */
public class UnwrapFromEnvelope<R extends ConnectRecord<R>> implements Transformation<R> { public class UnwrapFromEnvelope<R extends ConnectRecord<R>> implements Transformation<R> {
final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
public static enum DeleteHandling implements EnumeratedValue { public static enum DeleteHandling implements EnumeratedValue {
DROP("drop"), DROP("drop"),
REWRITE("rewrite"), REWRITE("rewrite"),
@ -115,9 +119,19 @@ public static DeleteHandling parse(String value, String defaultValue) {
+ "drop - records are removed," + "drop - records are removed,"
+ "rewrite - __deleted field is added to records."); + "rewrite - __deleted field is added to records.");
private static final Field OPERATION_HEADER = Field.create("operation.header")
.withDisplayName("Adds the debezium operation into the message header")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + DEBEZIUM_OPERATION_HEADER_KEY +"'");
private boolean dropTombstones; private boolean dropTombstones;
private boolean dropDeletes; private boolean dropDeletes;
private DeleteHandling handleDeletes; private DeleteHandling handleDeletes;
private boolean addOperationHeader;
private final ExtractField<R> afterDelegate = new ExtractField.Value<R>(); private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>(); private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
private final InsertField<R> removedDelegate = new InsertField.Value<R>(); private final InsertField<R> removedDelegate = new InsertField.Value<R>();
@ -143,6 +157,8 @@ public void configure(final Map<String, ?> configs) {
} }
} }
addOperationHeader = config.getBoolean(OPERATION_HEADER);
Map<String, String> delegateConfig = new HashMap<>(); Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", "before"); delegateConfig.put("field", "before");
beforeDelegate.configure(delegateConfig); beforeDelegate.configure(delegateConfig);
@ -164,13 +180,30 @@ public void configure(final Map<String, ?> configs) {
@Override @Override
public R apply(final R record) { public R apply(final R record) {
Envelope.Operation operation;
if (record.value() == null) { if (record.value() == null) {
if (dropTombstones) { if (dropTombstones) {
logger.trace("Tombstone {} arrived and requested to be dropped", record.key()); logger.trace("Tombstone {} arrived and requested to be dropped", record.key());
return null; return null;
} }
operation = Envelope.Operation.DELETE;
if (addOperationHeader) {
record.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, operation.toString());
}
return record; return record;
} }
if (addOperationHeader) {
String operationString = ((Struct) record.value()).getString("op");
operation = Envelope.Operation.forCode(operationString);
if (operationString.isEmpty() || operation == null) {
logger.warn("Unknown operation thus unable to add the operation header into the message");
} else {
record.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, operation.code());
}
}
if (record.valueSchema() == null || if (record.valueSchema() == null ||
record.valueSchema().name() == null || record.valueSchema().name() == null ||
!record.valueSchema().name().endsWith(ENVELOPE_SCHEMA_NAME_SUFFIX)) { !record.valueSchema().name().endsWith(ENVELOPE_SCHEMA_NAME_SUFFIX)) {
@ -206,7 +239,7 @@ public R apply(final R record) {
@Override @Override
public ConfigDef config() { public ConfigDef config() {
final ConfigDef config = new ConfigDef(); final ConfigDef config = new ConfigDef();
Field.group(config, null, DROP_TOMBSTONES, DROP_DELETES, HANDLE_DELETES); Field.group(config, null, DROP_TOMBSTONES, DROP_DELETES, HANDLE_DELETES, OPERATION_HEADER);
return config; return config;
} }

View File

@ -19,6 +19,7 @@
import org.junit.Test; import org.junit.Test;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
/** /**
* @author Jiri Pechanec * @author Jiri Pechanec
@ -28,6 +29,7 @@ public class UnwrapFromEnvelopeTest {
private static final String DROP_DELETES = "drop.deletes"; private static final String DROP_DELETES = "drop.deletes";
private static final String DROP_TOMBSTONES = "drop.tombstones"; private static final String DROP_TOMBSTONES = "drop.tombstones";
private static final String HANDLE_DELETES = "delete.handling.mode"; private static final String HANDLE_DELETES = "delete.handling.mode";
private static final String OPERATION_HEADER = "operation.header";
@Test @Test
public void testTombstoneDroppedByDefault() { public void testTombstoneDroppedByDefault() {
@ -108,6 +110,15 @@ private SourceRecord createUnknownUnnamedSchemaRecord() {
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before); return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before);
} }
private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) {
Iterator<Header> operationHeader = record.headers().allWithName(headerKey);
if (!operationHeader.hasNext()) {
return null;
}
return operationHeader.next().value().toString();
}
@Test @Test
public void testDeleteDroppedByDefault() { public void testDeleteDroppedByDefault() {
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) { try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
@ -124,6 +135,7 @@ public void testDeleteDroppedConfigured() {
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) { try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
props.put(DROP_DELETES, "true"); props.put(DROP_DELETES, "true");
props.put(OPERATION_HEADER, "true");
transform.configure(props); transform.configure(props);
final SourceRecord deleteRecord = createDeleteRecord(); final SourceRecord deleteRecord = createDeleteRecord();
@ -132,15 +144,19 @@ public void testDeleteDroppedConfigured() {
} }
@Test @Test
public void testDeleteFrowardConfigured() { public void testDeleteForwardConfigured() {
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) { try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
props.put(DROP_DELETES, "false"); props.put(DROP_DELETES, "false");
props.put(OPERATION_HEADER, "true");
transform.configure(props); transform.configure(props);
final SourceRecord deleteRecord = createDeleteRecord(); final SourceRecord deleteRecord = createDeleteRecord();
final SourceRecord tombstone = transform.apply(deleteRecord); final SourceRecord tombstone = transform.apply(deleteRecord);
assertThat(tombstone.value()).isNull(); assertThat(tombstone.value()).isNull();
assertThat(tombstone.headers()).hasSize(1);
String headerValue = getSourceRecordHeaderByKey(tombstone, transform.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat(headerValue).isEqualTo(Envelope.Operation.DELETE.code());
} }
} }
@ -187,11 +203,15 @@ public void testHandleCreateRewrite() {
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) { try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite"); props.put(HANDLE_DELETES, "rewrite");
props.put(OPERATION_HEADER, "true");
transform.configure(props); transform.configure(props);
final SourceRecord createRecord = createCreateRecord(); final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord); final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(((Struct)unwrapped.value()).getString("__deleted")).isEqualTo("false"); assertThat(((Struct)unwrapped.value()).getString("__deleted")).isEqualTo("false");
assertThat(unwrapped.headers()).hasSize(1);
String headerValue = getSourceRecordHeaderByKey(unwrapped, transform.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat(headerValue).isEqualTo(Envelope.Operation.CREATE.code());
} }
} }
@ -222,6 +242,7 @@ public void testIgnoreUnknownRecord() {
} }
@Test @Test
@FixFor("DBZ-971")
public void testUnwrapPropagatesRecordHeaders() { public void testUnwrapPropagatesRecordHeaders() {
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) { try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();