DBZ-677 Extracting shared configuration options for both new state extraction SMTs

This commit is contained in:
Gunnar Morling 2019-05-27 12:59:30 +02:00 committed by Jiri Pechanec
parent cb9e916616
commit cf4db8c1a1
5 changed files with 130 additions and 147 deletions

View File

@ -10,8 +10,6 @@
import java.util.Map.Entry;
import java.util.Set;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordState.DeleteHandling;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
@ -31,6 +29,9 @@
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;
/**
* Debezium Mongo Connector generates the CDC records in String format. Sink connectors usually are not able to parse
@ -93,10 +94,6 @@ public static ArrayEncoding parse(String value, String defaultValue) {
}
}
final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
private static final String DELETED_FIELD = "__deleted";
private static final Logger LOGGER = LoggerFactory.getLogger(ExtractNewDocumentState.class);
private static final Field ARRAY_ENCODING = Field.create("array.encoding")
@ -126,35 +123,6 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Delimiter to concat between field names from the input record when generating field names for the"
+ "output record.");
private static final Field OPERATION_HEADER = Field.create("operation.header")
.withDisplayName("Adds a message header representing the applied operation")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + DEBEZIUM_OPERATION_HEADER_KEY + "'");
private static final Field HANDLE_DELETES = Field.create("delete.handling.mode")
.withDisplayName("Handle delete records")
.withEnum(DeleteHandling.class, DeleteHandling.DROP)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("How to handle delete records. Options are: "
+ "none - records are passed,"
+ "drop - records are removed,"
+ "rewrite - __deleted field is added to records.");
private static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(true)
.withDescription("Debezium by default generates a tombstone record to enable Kafka compaction after "
+ "a delete record was generated. This record is usually filtered out to avoid duplicates "
+ "as a delete record is converted to a tombstone record, too");
private final ExtractField<R> afterExtractor = new ExtractField.Value<>();
private final ExtractField<R> patchExtractor = new ExtractField.Value<>();
private final ExtractField<R> keyExtractor = new ExtractField.Key<>();
@ -185,13 +153,13 @@ public R apply(R record) {
return null;
}
if (addOperationHeader) {
record.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, Envelope.Operation.DELETE.code());
record.headers().addString(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY, Envelope.Operation.DELETE.code());
}
return newRecord(record, keyDocument, valueDocument);
}
if (addOperationHeader) {
record.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, ((Struct) record.value()).get("op").toString());
record.headers().addString(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY, ((Struct) record.value()).get("op").toString());
}
// insert
@ -216,7 +184,7 @@ public R apply(R record) {
}
if (handleDeletes.equals(DeleteHandling.REWRITE)) {
valueDocument.append(DELETED_FIELD, new BsonBoolean(isDeletion));
valueDocument.append(ExtractNewRecordStateConfigDefinition.DELETED_FIELD, new BsonBoolean(isDeletion));
}
@ -352,7 +320,10 @@ public void close() {
@Override
public void configure(final Map<String, ?> map) {
final Configuration config = Configuration.from(map);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, OPERATION_HEADER, HANDLE_DELETES, DROP_TOMBSTONES);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER,
ExtractNewRecordStateConfigDefinition.OPERATION_HEADER,
ExtractNewRecordStateConfigDefinition.HANDLE_DELETES,
ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
@ -360,13 +331,13 @@ public void configure(final Map<String, ?> map) {
converter = new MongoDataConverter(ArrayEncoding.parse(config.getString(ARRAY_ENCODING)));
addOperationHeader = config.getBoolean(OPERATION_HEADER);
addOperationHeader = config.getBoolean(ExtractNewRecordStateConfigDefinition.OPERATION_HEADER);
flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER);
dropTombstones = config.getBoolean(DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(HANDLE_DELETES));
dropTombstones = config.getBoolean(ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(ExtractNewRecordStateConfigDefinition.HANDLE_DELETES));
final Map<String, String> afterExtractorConfig = new HashMap<>();
afterExtractorConfig.put("field", "after");

View File

@ -37,6 +37,7 @@
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.schema.TopicSelector;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.util.Collect;
/**
@ -110,7 +111,7 @@ public void shouldTransformRecordForInsertEvent() throws InterruptedException {
SourceRecord transformed = transformation.apply(record);
// then assert operation header is insert
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewDocumentState.DEBEZIUM_OPERATION_HEADER_KEY);
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.CREATE.code());
@ -210,7 +211,7 @@ public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
SourceRecord transformed = transformation.apply(record);
// then assert operation header is update
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewDocumentState.DEBEZIUM_OPERATION_HEADER_KEY);
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.UPDATE.code());
@ -386,7 +387,7 @@ public void shouldGenerateRecordHeaderForTombstone() throws InterruptedException
assertThat(key.get("id")).isEqualTo(objId.toString());
// then assert operation header is delete
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewDocumentState.DEBEZIUM_OPERATION_HEADER_KEY);
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
@ -526,7 +527,7 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
SourceRecord transformed = transformation.apply(record);
// then assert operation header is delete
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewDocumentState.DEBEZIUM_OPERATION_HEADER_KEY);
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
@ -812,7 +813,7 @@ public void canUseDeprecatedSmt() throws InterruptedException {
SourceRecord transformed = transformation.apply(record);
// then assert operation header is insert
Iterator<Header> operationHeader = transformed.headers().allWithName(UnwrapFromMongoDbEnvelope.DEBEZIUM_OPERATION_HEADER_KEY);
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.CREATE.code());

View File

@ -19,9 +19,9 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;
/**
* Debezium generates CDC (<code>Envelope</code>) records that are struct of values containing values
@ -41,90 +41,9 @@
*/
public class ExtractNewRecordState<R extends ConnectRecord<R>> implements Transformation<R> {
final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
public static enum DeleteHandling implements EnumeratedValue {
DROP("drop"),
REWRITE("rewrite"),
NONE("none");
private final String value;
private DeleteHandling(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DeleteHandling parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (DeleteHandling option : DeleteHandling.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static DeleteHandling parse(String value, String defaultValue) {
DeleteHandling mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
private static final String ENVELOPE_SCHEMA_NAME_SUFFIX = ".Envelope";
private static final String DELETED_FIELD = "__deleted!";
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(true)
.withDescription("Debezium by default generates a tombstone record to enable Kafka compaction after "
+ "a delete record was generated. This record is usually filtered out to avoid duplicates "
+ "as a delete record is converted to a tombstone record, too");
private static final Field HANDLE_DELETES = Field.create("delete.handling.mode")
.withDisplayName("Handle delete records")
.withEnum(DeleteHandling.class, DeleteHandling.DROP)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("How to handle delete records. Options are: "
+ "none - records are passed,"
+ "drop - records are removed (the default),"
+ "rewrite - __deleted field is added to records.");
private static final Field OPERATION_HEADER = Field.create("operation.header")
.withDisplayName("Adds the debezium operation into the message header")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + DEBEZIUM_OPERATION_HEADER_KEY +"'");
private static final Logger LOGGER = LoggerFactory.getLogger(ExtractNewRecordState.class);
private boolean dropTombstones;
private DeleteHandling handleDeletes;
@ -137,15 +56,15 @@ public static DeleteHandling parse(String value, String defaultValue) {
@Override
public void configure(final Map<String, ?> configs) {
final Configuration config = Configuration.from(configs);
final Field.Set configFields = Field.setOf(DROP_TOMBSTONES, HANDLE_DELETES);
if (!config.validateAndRecord(configFields, logger::error)) {
final Field.Set configFields = Field.setOf(ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES, ExtractNewRecordStateConfigDefinition.HANDLE_DELETES);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
}
dropTombstones = config.getBoolean(DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(HANDLE_DELETES));
dropTombstones = config.getBoolean(ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(ExtractNewRecordStateConfigDefinition.HANDLE_DELETES));
addOperationHeader = config.getBoolean(OPERATION_HEADER);
addOperationHeader = config.getBoolean(ExtractNewRecordStateConfigDefinition.OPERATION_HEADER);
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", "before");
@ -156,12 +75,12 @@ public void configure(final Map<String, ?> configs) {
afterDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.field", ExtractNewRecordStateConfigDefinition.DELETED_FIELD);
delegateConfig.put("static.value", "true");
removedDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.field", ExtractNewRecordStateConfigDefinition.DELETED_FIELD);
delegateConfig.put("static.value", "false");
updatedDelegate.configure(delegateConfig);
}
@ -171,12 +90,12 @@ public R apply(final R record) {
Envelope.Operation operation;
if (record.value() == null) {
if (dropTombstones) {
logger.trace("Tombstone {} arrived and requested to be dropped", record.key());
LOGGER.trace("Tombstone {} arrived and requested to be dropped", record.key());
return null;
}
operation = Envelope.Operation.DELETE;
if (addOperationHeader) {
record.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, operation.toString());
record.headers().addString(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY, operation.toString());
}
return record;
}
@ -186,16 +105,16 @@ public R apply(final R record) {
operation = Envelope.Operation.forCode(operationString);
if (operationString.isEmpty() || operation == null) {
logger.warn("Unknown operation thus unable to add the operation header into the message");
LOGGER.warn("Unknown operation thus unable to add the operation header into the message");
} else {
record.headers().addString(DEBEZIUM_OPERATION_HEADER_KEY, operation.code());
record.headers().addString(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY, operation.code());
}
}
if (record.valueSchema() == null ||
record.valueSchema().name() == null ||
!record.valueSchema().name().endsWith(ENVELOPE_SCHEMA_NAME_SUFFIX)) {
logger.warn("Expected Envelope for transformation, passing it unchanged");
LOGGER.warn("Expected Envelope for transformation, passing it unchanged");
return record;
}
final R newRecord = afterDelegate.apply(record);
@ -203,10 +122,10 @@ public R apply(final R record) {
// Handling delete records
switch (handleDeletes) {
case DROP:
logger.trace("Delete message {} requested to be dropped", record.key());
LOGGER.trace("Delete message {} requested to be dropped", record.key());
return null;
case REWRITE:
logger.trace("Delete message {} requested to be rewritten", record.key());
LOGGER.trace("Delete message {} requested to be rewritten", record.key());
final R oldRecord = beforeDelegate.apply(record);
return removedDelegate.apply(oldRecord);
default:
@ -216,7 +135,7 @@ public R apply(final R record) {
// Handling insert and update records
switch (handleDeletes) {
case REWRITE:
logger.trace("Insert/update message {} requested to be rewritten", record.key());
LOGGER.trace("Insert/update message {} requested to be rewritten", record.key());
return updatedDelegate.apply(newRecord);
default:
return newRecord;
@ -227,7 +146,7 @@ public R apply(final R record) {
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, DROP_TOMBSTONES, HANDLE_DELETES, OPERATION_HEADER);
Field.group(config, null, ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES, ExtractNewRecordStateConfigDefinition.HANDLE_DELETES, ExtractNewRecordStateConfigDefinition.OPERATION_HEADER);
return config;
}

View File

@ -0,0 +1,92 @@
package io.debezium.transforms;
import org.apache.kafka.common.config.ConfigDef;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
public class ExtractNewRecordStateConfigDefinition {
public static final String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
public static final String DELETED_FIELD = "__deleted";
public static enum DeleteHandling implements EnumeratedValue {
DROP("drop"),
REWRITE("rewrite"),
NONE("none");
private final String value;
private DeleteHandling(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DeleteHandling parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (DeleteHandling option : DeleteHandling.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static DeleteHandling parse(String value, String defaultValue) {
DeleteHandling mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
public static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(true)
.withDescription("Debezium by default generates a tombstone record to enable Kafka compaction after "
+ "a delete record was generated. This record is usually filtered out to avoid duplicates "
+ "as a delete record is converted to a tombstone record, too");
public static final Field HANDLE_DELETES = Field.create("delete.handling.mode")
.withDisplayName("Handle delete records")
.withEnum(DeleteHandling.class, DeleteHandling.DROP)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("How to handle delete records. Options are: "
+ "none - records are passed,"
+ "drop - records are removed (the default),"
+ "rewrite - __deleted field is added to records.");
public static final Field OPERATION_HEADER = Field.create("operation.header")
.withDisplayName("Adds a message header representing the applied operation")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY + "'");
}

View File

@ -179,7 +179,7 @@ public void testHandleCreateRewrite() {
final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(((Struct) unwrapped.value()).getString("__deleted")).isEqualTo("false");
assertThat(unwrapped.headers()).hasSize(1);
String headerValue = getSourceRecordHeaderByKey(unwrapped, ExtractNewRecordState.DEBEZIUM_OPERATION_HEADER_KEY);
String headerValue = getSourceRecordHeaderByKey(unwrapped, ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat(headerValue).isEqualTo(Envelope.Operation.CREATE.code());
}
}