DBZ-7695 add _id field to tombstone rewrite events, for delete events

This commit is contained in:
ismail simsek 2024-04-12 14:29:22 +00:00 committed by Jiri Pechanec
parent 440b728020
commit f7c4511432
2 changed files with 64 additions and 1 deletions

View File

@ -130,12 +130,21 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Delimiter to concat between field names from the input record when generating field names for the" .withDescription("Delimiter to concat between field names from the input record when generating field names for the"
+ "output record."); + "output record.");
public static final Field REWRITE_TOMBSTONE_DELETES_WITH_ID = Field.create("delete.tombstone.handling.mode.rewrite-with-id")
.withDisplayName("Rewrite delete records with id field")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription(
"When set to true and \"delete.tombstone.handling.mode\" is rewrite, extracts the \"id\" from the deleted record's key and includes it as \"_id\" in the event payload.");
private ExtractField<R> keyExtractor; private ExtractField<R> keyExtractor;
private Flatten<R> recordFlattener; private Flatten<R> recordFlattener;
private MongoDataConverter converter; private MongoDataConverter converter;
private boolean flattenStruct; private boolean flattenStruct;
private String delimiter; private String delimiter;
private boolean rewriteTombstoneDeletesWithId;
private final Field.Set configFields = CONFIG_FIELDS.with(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER); private final Field.Set configFields = CONFIG_FIELDS.with(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER);
@Override @Override
@ -152,6 +161,7 @@ public void configure(final Map<String, ?> configs) {
flattenStruct = config.getBoolean(FLATTEN_STRUCT); flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER); delimiter = config.getString(DELIMITER);
rewriteTombstoneDeletesWithId = config.getBoolean(REWRITE_TOMBSTONE_DELETES_WITH_ID);
keyExtractor = ConnectRecordUtil.extractKeyDelegate("id"); keyExtractor = ConnectRecordUtil.extractKeyDelegate("id");
recordFlattener = ConnectRecordUtil.flattenValueDelegate(delimiter); recordFlattener = ConnectRecordUtil.flattenValueDelegate(delimiter);
@ -216,6 +226,9 @@ public R doApply(R record) {
// add rewrite field // add rewrite field
if (extractRecordStrategy.isRewriteMode()) { if (extractRecordStrategy.isRewriteMode()) {
valueDocument.append(DELETED_FIELD, new BsonBoolean(isDeletion)); valueDocument.append(DELETED_FIELD, new BsonBoolean(isDeletion));
if (rewriteTombstoneDeletesWithId && !valueDocument.containsKey("_id") && keyDocument.containsKey("id")) {
valueDocument.append("_id", keyDocument.get("id"));
}
} }
return newRecord(record, keyDocument, valueDocument); return newRecord(record, keyDocument, valueDocument);

View File

@ -5,6 +5,7 @@
*/ */
package io.debezium.connector.mongodb.transforms; package io.debezium.connector.mongodb.transforms;
import static io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.REWRITE_TOMBSTONE_DELETES_WITH_ID;
import static io.debezium.junit.EqualityCheck.LESS_THAN; import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -399,6 +400,55 @@ public void shouldAddFieldsForRewriteDeleteEvent() throws InterruptedException {
assertThat(value.get("__db")).isEqualTo(DB_NAME); assertThat(value.get("__db")).isEqualTo(DB_NAME);
} }
@Test
@FixFor("DBZ-7695")
public void shouldAddFieldsForRewriteDeleteEventWithId() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "ord,db,op");
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
props.put(REWRITE_TOMBSTONE_DELETES_WITH_ID.name(), "true");
transformation.configure(props);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{ '_id' : 4, 'name' : 'Sally' }"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : 4 }"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
// assert source fields' values
final Struct value = (Struct) transformed.value();
assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
assertThat(value.get("__db")).isEqualTo(source.getString("db"));
assertThat(value.get("__db")).isEqualTo(DB_NAME);
assertThat(value.get("__deleted")).isEqualTo(true);
assertThat(value.get("__op")).isEqualTo("d");
assertThat(value.get("_id")).isEqualTo(4);
}
@Test @Test
public void shouldTransformRecordForInsertEvent() throws InterruptedException { public void shouldTransformRecordForInsertEvent() throws InterruptedException {
waitForStreamingRunning(); waitForStreamingRunning();