DBZ-563 Support tombstone drop on MongoDB Unwrapper

This commit is contained in:
Renato Mefi 2018-11-18 17:57:20 +01:00 committed by Gunnar Morling
parent 16ba3764e5
commit 0d6236c8d8
2 changed files with 27 additions and 1 deletions

View File

@ -111,6 +111,16 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Delimiter to concat between field names from the input record when generating field names for the"
+ "output record.");
private static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(true)
.withDescription("Debezium by default generates a tombstone record to enable Kafka compaction after "
+ "a delete record was generated. This record is usually filtered out to avoid duplicates "
+ "as a delete record is converted to a tombstone record, too");
private final ExtractField<R> afterExtractor = new ExtractField.Value<R>();
private final ExtractField<R> patchExtractor = new ExtractField.Value<R>();
private final ExtractField<R> keyExtractor = new ExtractField.Key<R>();
@ -121,6 +131,8 @@ public static ArrayEncoding parse(String value, String defaultValue) {
private boolean flattenStruct;
private String delimiter;
private boolean dropTombstones;
private R getTombstoneRecord(R r) {
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
@ -148,6 +160,10 @@ private R getTombstoneRecord(R r) {
public R apply(R r) {
// Tombstone message
if (r.value() == null) {
if (dropTombstones) {
LOGGER.trace("Tombstone {} arrived and requested to be dropped", r.key());
return null;
}
return getTombstoneRecord(r);
}
@ -295,7 +311,7 @@ public void close() {
@Override
public void configure(final Map<String, ?> map) {
final Configuration config = Configuration.from(map);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, DROP_TOMBSTONES);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
@ -306,6 +322,8 @@ public void configure(final Map<String, ?> map) {
flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER);
dropTombstones = config.getBoolean(DROP_TOMBSTONES);
final Map<String, String> afterExtractorConfig = new HashMap<>();
afterExtractorConfig.put("field", "after");
final Map<String, String> patchExtractorConfig = new HashMap<>();

View File

@ -10,6 +10,8 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@ -46,6 +48,8 @@ public class UnwrapFromMongoDbEnvelopeTestIT extends AbstractConnectorTest {
private static final String COLLECTION_NAME = "source";
private static final String TOPIC_NAME = "mongo.transform.source";
private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
private Configuration config;
private MongoDbTaskContext context;
private UnwrapFromMongoDbEnvelope<SourceRecord> transformation;
@ -91,6 +95,10 @@ public void shouldTransformEvents() throws InterruptedException, IOException {
// Start the connector ...
start(MongoDbConnector.class, config);
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put(CONFIG_DROP_TOMBSTONES, "false");
transformation.configure(transformationConfig);
// Test insert
primary().execute("insert", client -> {
client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME)