DBZ-6907 Introduce a new handling deleted record option "delete.tombstone.handling.mode"

This commit is contained in:
harveyyue 2023-10-11 19:49:54 +08:00 committed by Jiri Pechanec
parent fccaf37549
commit 4836fcf1ea
8 changed files with 427 additions and 123 deletions

View File

@ -39,7 +39,6 @@
import io.debezium.schema.FieldNameSelector;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.transforms.AbstractExtractNewRecordState;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;
/**
* Debezium Mongo Connector generates the CDC records in String format. Sink connectors usually are not able to parse
@ -131,7 +130,6 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Delimiter to concat between field names from the input record when generating field names for the"
+ "output record.");
private final ExtractField<R> updateDescriptionExtractor = new ExtractField.Value<>();
private final ExtractField<R> keyExtractor = new ExtractField.Key<>();
private final Flatten<R> recordFlattener = new Flatten.Value<>();
private MongoDataConverter converter;
@ -155,15 +153,11 @@ public void configure(final Map<String, ?> configs) {
flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER);
final Map<String, String> updateDescriptionExtractorConfig = new HashMap<>();
updateDescriptionExtractorConfig.put("field", MongoDbFieldName.UPDATE_DESCRIPTION);
updateDescriptionExtractor.configure(updateDescriptionExtractorConfig);
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", "id");
keyExtractor.configure(delegateConfig);
final Map<String, String> keyExtractorConfig = new HashMap<>();
keyExtractorConfig.put("field", "id");
keyExtractor.configure(keyExtractorConfig);
final Map<String, String> delegateConfig = new HashMap<>();
delegateConfig = new HashMap<>();
delegateConfig.put("delimiter", delimiter);
recordFlattener.configure(delegateConfig);
}
@ -173,6 +167,11 @@ public R doApply(R record) {
if (!smtManager.isValidKey(record)) {
return record;
}
// Add headers if needed
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
}
final R keyRecord = keyExtractor.apply(record);
@ -181,14 +180,10 @@ public R doApply(R record) {
// Tombstone message
if (record.value() == null) {
if (dropTombstones) {
LOGGER.trace("Tombstone {} arrived and requested to be dropped", record.key());
R newRecord = extractRecordStrategy.handleTruncateRecord(record);
if (newRecord == null) {
return null;
}
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
}
return newRecord(record, keyDocument, valueDocument);
}
@ -196,41 +191,35 @@ public R doApply(R record) {
return record;
}
final R beforeRecord = beforeDelegate.apply(record);
final R afterRecord = afterDelegate.apply(record);
final R updateDescriptionRecord = updateDescriptionExtractor.apply(record);
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
final R afterRecord = extractRecordStrategy.afterDelegate().apply(record);
final R updateDescriptionRecord = extractRecordStrategy.updateDescriptionDelegate().apply(record);
boolean isDeletion = false;
R newRecord;
if (afterRecord.value() == null && updateDescriptionRecord.value() == null) {
// Handling delete records
isDeletion = true;
newRecord = extractRecordStrategy.handleDeleteRecord(record);
if (newRecord == null) {
return null;
}
}
else {
// Handling insert and update records
newRecord = extractRecordStrategy.handleRecord(record);
}
// insert || replace || update with capture.mode="change_streams_update_full" or "change_streams_update_full_with_pre_image"
if (afterRecord.value() != null) {
valueDocument = getFullDocument(afterRecord, keyDocument);
if (newRecord.value() != null) {
valueDocument = getFullDocument(newRecord, keyDocument);
}
// update
if (afterRecord.value() == null && updateDescriptionRecord.value() != null) {
valueDocument = getPartialUpdateDocument(beforeRecord, updateDescriptionRecord, keyDocument);
if (newRecord.value() == null && updateDescriptionRecord.value() != null) {
valueDocument = getPartialUpdateDocument(newRecord, updateDescriptionRecord, keyDocument);
}
boolean isDeletion = false;
// delete
if (afterRecord.value() == null && updateDescriptionRecord.value() == null) {
if (handleDeletes.equals(DeleteHandling.DROP)) {
LOGGER.trace("Delete {} arrived and requested to be dropped", record.key());
return null;
}
if (beforeRecord.value() != null && handleDeletes.equals(DeleteHandling.REWRITE)) {
valueDocument = getFullDocument(beforeRecord, keyDocument);
}
isDeletion = true;
}
if (handleDeletes.equals(DeleteHandling.REWRITE)) {
// add rewrite field
if (extractRecordStrategy.isRewriteMode()) {
valueDocument.append(DELETED_FIELD, new BsonBoolean(isDeletion));
}
@ -252,7 +241,6 @@ public ConfigDef config() {
@Override
public void close() {
super.close();
updateDescriptionExtractor.close();
keyExtractor.close();
recordFlattener.close();
}

View File

@ -5,8 +5,6 @@
*/
package io.debezium.transforms;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import static io.debezium.data.Envelope.FieldName.OPERATION;
import static io.debezium.data.Envelope.FieldName.SOURCE;
import static io.debezium.data.Envelope.FieldName.TIMESTAMP;
@ -18,15 +16,13 @@
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ADD_HEADERS;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DELETED_FIELD;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.HANDLE_DELETES;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.HANDLE_TOMBSTONE_DELETES;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ROUTE_BY_FIELD;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -41,8 +37,6 @@
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.slf4j.Logger;
@ -52,6 +46,10 @@
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteTombstoneHandling;
import io.debezium.transforms.strategy.DeleteExtractRecordStrategy;
import io.debezium.transforms.strategy.DeleteTombstoneExtractRecordStrategy;
import io.debezium.transforms.strategy.ExtractRecordStrategy;
import io.debezium.util.Strings;
/**
@ -68,15 +66,9 @@ public abstract class AbstractExtractNewRecordState<R extends ConnectRecord<R>>
private static final String UPDATE_DESCRIPTION = "updateDescription";
protected static final String PURPOSE = "source field insertion";
protected final ExtractField<R> afterDelegate = new ExtractField.Value<>();
protected final ExtractField<R> beforeDelegate = new ExtractField.Value<>();
protected final InsertField<R> removedDelegate = new InsertField.Value<>();
protected final InsertField<R> updatedDelegate = new InsertField.Value<>();
protected Configuration config;
protected SmtManager<R> smtManager;
protected boolean dropTombstones;
protected DeleteHandling handleDeletes;
protected ExtractRecordStrategy<R> extractRecordStrategy;
protected List<FieldReference> additionalHeaders;
protected List<FieldReference> additionalFields;
protected String routeByField;
@ -90,33 +82,26 @@ public void configure(final Map<String, ?> configs) {
throw new ConnectException("Unable to validate config.");
}
dropTombstones = config.getBoolean(DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(HANDLE_DELETES));
String addFieldsPrefix = config.getString(ADD_FIELDS_PREFIX);
String addHeadersPrefix = config.getString(ADD_HEADERS_PREFIX);
additionalFields = FieldReference.fromConfiguration(addFieldsPrefix, config.getString(ADD_FIELDS));
additionalHeaders = FieldReference.fromConfiguration(addHeadersPrefix, config.getString(ADD_HEADERS));
String routeFieldConfig = config.getString(ROUTE_BY_FIELD);
routeByField = routeFieldConfig.isEmpty() ? null : routeFieldConfig;
Map<String, String> delegateConfig = new LinkedHashMap<>();
delegateConfig.put("field", BEFORE);
beforeDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("field", AFTER);
afterDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.value", "true");
removedDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.value", "false");
updatedDelegate.configure(delegateConfig);
// handle deleted records
if (!Strings.isNullOrBlank(config.getString(HANDLE_TOMBSTONE_DELETES))) {
DeleteTombstoneHandling deleteTombstoneHandling = DeleteTombstoneHandling.parse(config.getString(HANDLE_TOMBSTONE_DELETES));
extractRecordStrategy = new DeleteTombstoneExtractRecordStrategy<>(deleteTombstoneHandling);
}
else {
// will be removed in further release
boolean dropTombstones = config.getBoolean(DROP_TOMBSTONES);
DeleteHandling handleDeletes = DeleteHandling.parse(config.getString(HANDLE_DELETES));
extractRecordStrategy = new DeleteExtractRecordStrategy<>(handleDeletes, dropTombstones);
LOGGER.warn(
"The deleted record handling configs \"drop.tombstones\" and \"delete.handling.mode\" have been deprecated, " +
"please use \"delete.tombstone.handling.mode\" instead of.");
}
}
@Override
@ -130,10 +115,9 @@ public R apply(final R record) {
@Override
public void close() {
beforeDelegate.close();
afterDelegate.close();
removedDelegate.close();
updatedDelegate.close();
if (extractRecordStrategy != null) {
extractRecordStrategy.close();
}
}
/**

View File

@ -102,33 +102,34 @@ public void configure(final Map<String, ?> configs) {
@Override
public R doApply(final R record) {
if (record.value() == null) {
if (dropTombstones) {
LOGGER.trace("Tombstone {} arrived and requested to be dropped", record.key());
return null;
}
// Add headers if needed
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
}
return record;
// Handling tombstone record
if (record.value() == null) {
return extractRecordStrategy.handleTruncateRecord(record);
}
if (!smtManager.isValidEnvelope(record)) {
return record;
}
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
}
R newRecord = afterDelegate.apply(record);
if (newRecord.value() == null && beforeDelegate.apply(record).value() == null) {
R newRecord = extractRecordStrategy.afterDelegate().apply(record);
if (newRecord.value() == null && extractRecordStrategy.beforeDelegate().apply(record).value() == null) {
LOGGER.trace("Truncate event arrived and requested to be dropped");
return null;
}
if (newRecord.value() == null) {
// Handling delete records
newRecord = extractRecordStrategy.handleDeleteRecord(record);
if (newRecord == null) {
return null;
}
if (routeByField != null) {
Struct recordValue = requireStruct(record.value(), "Read record to set topic routing for DELETE");
String newTopicName = recordValue.getStruct("before").getString(routeByField);
@ -139,22 +140,13 @@ public R doApply(final R record) {
newRecord = dropFields(newRecord);
}
// Handling delete records
switch (handleDeletes) {
case DROP:
LOGGER.trace("Delete message {} requested to be dropped", record.key());
return null;
case REWRITE:
LOGGER.trace("Delete message {} requested to be rewritten", record.key());
R oldRecord = beforeDelegate.apply(record);
oldRecord = addFields(additionalFields, record, oldRecord);
return removedDelegate.apply(oldRecord);
default:
return newRecord;
if (newRecord.value() != null) {
newRecord = addFields(additionalFields, record, newRecord);
}
}
else {
// Handling insert and update records
newRecord = extractRecordStrategy.handleRecord(record);
// Add on any requested source fields from the original record to the new unwrapped record
if (routeByField != null) {
Struct recordValue = requireStruct(newRecord.value(), "Read record to set topic routing for CREATE / UPDATE");
@ -167,17 +159,9 @@ public R doApply(final R record) {
if (!Strings.isNullOrEmpty(dropFieldsHeaderName)) {
newRecord = dropFields(newRecord);
}
// Handling insert and update records
switch (handleDeletes) {
case REWRITE:
LOGGER.trace("Insert/update message {} requested to be rewritten", record.key());
return updatedDelegate.apply(newRecord);
default:
}
return newRecord;
}
}
}
@Override
public Iterable<Field> validateConfigFields() {

View File

@ -67,6 +67,71 @@ public static DeleteHandling parse(String value, String defaultValue) {
}
}
public enum DeleteTombstoneHandling implements EnumeratedValue {
DROP("drop"),
TOMBSTONE("tombstone"),
REWRITE("rewrite"),
REWRITE_WITH_TOMBSTONE("rewrite-with-tombstone");
private final String value;
DeleteTombstoneHandling(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DeleteTombstoneHandling parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (DeleteTombstoneHandling option : DeleteTombstoneHandling.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static DeleteTombstoneHandling parse(String value, String defaultValue) {
DeleteTombstoneHandling mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
public static final Field HANDLE_TOMBSTONE_DELETES = Field.create("delete.tombstone.handling.mode")
.withDisplayName("Handle delete records")
.withEnum(DeleteTombstoneHandling.class)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.optional()
.withDescription("How to handle delete records. Options are: "
+ "drop - will remove both delete and tombstone,"
+ "tombstone - will convert delete to tombstone and remove tombstone (the default),"
+ "rewrite - will convert delete and remove tombstone, __deleted field is added to records,"
+ "rewrite-with-tombstone - will convert delete and keep tombstone, __deleted field is added to records.");
@Deprecated
public static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
@ -75,8 +140,10 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withDefault(true)
.withDescription("Debezium by default generates a tombstone record to enable Kafka compaction after "
+ "a delete record was generated. This record is usually filtered out to avoid duplicates "
+ "as a delete record is converted to a tombstone record, too");
+ "as a delete record is converted to a tombstone record, too"
+ "Note: will be removed in further release, use \"delete.tombstone.handling.mode\" instead");
@Deprecated
public static final Field HANDLE_DELETES = Field.create("delete.handling.mode")
.withDisplayName("Handle delete records")
.withEnum(DeleteHandling.class, DeleteHandling.DROP)
@ -85,7 +152,8 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withDescription("How to handle delete records. Options are: "
+ "none - records are passed,"
+ "drop - records are removed (the default),"
+ "rewrite - __deleted field is added to records.");
+ "rewrite - __deleted field is added to records."
+ "Note: will be removed in further release, use \"delete.tombstone.handling.mode\" instead");
public static final Field ROUTE_BY_FIELD = Field.create("route.by.field")
.withDisplayName("Route by field name")

View File

@ -0,0 +1,85 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.strategy;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DELETED_FIELD;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.InsertField;
/**
* An abstract implementation of {@link ExtractRecordStrategy}.
*
* @author Harvey Yue
*/
public abstract class AbstractExtractRecordStrategy<R extends ConnectRecord<R>> implements ExtractRecordStrategy<R> {
private static final String UPDATE_DESCRIPTION = "updateDescription";
protected final ExtractField<R> afterDelegate = new ExtractField.Value<>();
protected final ExtractField<R> beforeDelegate = new ExtractField.Value<>();
protected final InsertField<R> removedDelegate = new InsertField.Value<>();
protected final InsertField<R> updatedDelegate = new InsertField.Value<>();
// for mongodb
protected final ExtractField<R> updateDescriptionDelegate = new ExtractField.Value<>();
public AbstractExtractRecordStrategy() {
init();
}
private void init() {
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", BEFORE);
beforeDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("field", AFTER);
afterDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.value", "true");
removedDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.value", "false");
updatedDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("field", UPDATE_DESCRIPTION);
updateDescriptionDelegate.configure(delegateConfig);
}
@Override
public ExtractField<R> afterDelegate() {
return afterDelegate;
}
@Override
public ExtractField<R> beforeDelegate() {
return beforeDelegate;
}
@Override
public ExtractField<R> updateDescriptionDelegate() {
return updateDescriptionDelegate;
}
@Override
public void close() {
beforeDelegate.close();
afterDelegate.close();
removedDelegate.close();
updatedDelegate.close();
updateDescriptionDelegate.close();
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.strategy;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;
/**
* Deprecated, use {@link DeleteTombstoneExtractRecordStrategy} instead
*
* @author Harvey Yue
*/
@Deprecated
public class DeleteExtractRecordStrategy<R extends ConnectRecord<R>> extends AbstractExtractRecordStrategy<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteExtractRecordStrategy.class);
private final DeleteHandling deleteHandling;
private final boolean dropTombstones;
public DeleteExtractRecordStrategy(DeleteHandling deleteHandling, boolean dropTombstones) {
this.deleteHandling = deleteHandling;
this.dropTombstones = dropTombstones;
}
@Override
public R handleTruncateRecord(R record) {
if (dropTombstones) {
LOGGER.trace("Tombstone {} arrived and requested to be dropped", record.key());
return null;
}
return record;
}
@Override
public R handleDeleteRecord(R record) {
switch (deleteHandling) {
case DROP:
LOGGER.trace("Delete message {} requested to be dropped", record.key());
return null;
case NONE:
return afterDelegate.apply(record);
case REWRITE:
LOGGER.trace("Delete message {} requested to be rewritten", record.key());
R oldRecord = beforeDelegate.apply(record);
if (oldRecord.value() instanceof Struct) {
return removedDelegate.apply(oldRecord);
}
return oldRecord;
default:
throw new DebeziumException("Unknown delete handling mode: " + deleteHandling);
}
}
@Override
public R handleRecord(R record) {
R newRecord = afterDelegate.apply(record);
if (deleteHandling == DeleteHandling.REWRITE) {
LOGGER.trace("Insert/update message {} requested to be rewritten", record.key());
if (newRecord.value() instanceof Struct) {
return updatedDelegate.apply(newRecord);
}
}
return newRecord;
}
@Override
public boolean isRewriteMode() {
return deleteHandling == DeleteHandling.REWRITE;
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.strategy;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteTombstoneHandling;
/**
* A default implementation of {@link AbstractExtractRecordStrategy}
*
* @author Harvey Yue
*/
public class DeleteTombstoneExtractRecordStrategy<R extends ConnectRecord<R>> extends AbstractExtractRecordStrategy<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTombstoneExtractRecordStrategy.class);
private final DeleteTombstoneHandling deleteTombstoneHandling;
public DeleteTombstoneExtractRecordStrategy(DeleteTombstoneHandling deleteTombstoneHandling) {
this.deleteTombstoneHandling = deleteTombstoneHandling;
}
@Override
public R handleTruncateRecord(R record) {
switch (deleteTombstoneHandling) {
case DROP:
case TOMBSTONE:
case REWRITE:
LOGGER.trace("Tombstone {} arrived and requested to be dropped", record.key());
return null;
case REWRITE_WITH_TOMBSTONE:
return record;
default:
throw new DebeziumException("Unknown delete handling mode: " + deleteTombstoneHandling);
}
}
@Override
public R handleDeleteRecord(R record) {
switch (deleteTombstoneHandling) {
case DROP:
LOGGER.trace("Delete message {} requested to be dropped", record.key());
return null;
case TOMBSTONE:
return afterDelegate.apply(record);
case REWRITE:
case REWRITE_WITH_TOMBSTONE:
LOGGER.trace("Delete message {} requested to be rewritten", record.key());
R oldRecord = beforeDelegate.apply(record);
return removedDelegate.apply(oldRecord);
default:
throw new DebeziumException("Unknown delete handling mode: " + deleteTombstoneHandling);
}
}
@Override
public R handleRecord(R record) {
R newRecord = afterDelegate.apply(record);
switch (deleteTombstoneHandling) {
case REWRITE:
case REWRITE_WITH_TOMBSTONE:
LOGGER.trace("Insert/update message {} requested to be rewritten", record.key());
return updatedDelegate.apply(newRecord);
default:
return newRecord;
}
}
@Override
public boolean isRewriteMode() {
return deleteTombstoneHandling == DeleteTombstoneHandling.REWRITE
|| deleteTombstoneHandling == DeleteTombstoneHandling.REWRITE_WITH_TOMBSTONE;
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.strategy;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.ExtractField;
import io.debezium.data.Envelope;
/**
* A {@link ExtractRecordStrategy} is used by the transformer to determine
* how to extract Truncate, Delete, Create and Update record from {@link Envelope}
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Harvey Yue
*/
public interface ExtractRecordStrategy<R extends ConnectRecord<R>> {
R handleTruncateRecord(R record);
R handleDeleteRecord(R record);
R handleRecord(R record);
ExtractField<R> afterDelegate();
ExtractField<R> beforeDelegate();
ExtractField<R> updateDescriptionDelegate();
boolean isRewriteMode();
void close();
}