DBZ-7066 Add NewExtractNewDocumentStateTest and NewExtractNewDocumentStateTestIT for new option "delete.tombstone.handling.mode"

This commit is contained in:
harveyyue 2023-10-29 15:50:26 +08:00 committed by Jiri Pechanec
parent d21d5838fc
commit 4c3b5540ec
6 changed files with 2157 additions and 103 deletions

View File

@ -8,9 +8,14 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Before;
@ -33,6 +38,19 @@ public abstract class AbstractExtractNewDocumentStateTestIT extends AbstractMong
protected static final String DB_NAME = "transform_operations";
protected static final String SERVER_NAME = "mongo";
// for ExtractNewDocumentStateTestIT
protected static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
protected static final String HANDLE_DELETES = "delete.handling.mode";
protected static final String HANDLE_TOMBSTONE_DELETES = "delete.tombstone.handling.mode";
protected static final String FLATTEN_STRUCT = "flatten.struct";
protected static final String DELIMITER = "flatten.struct.delimiter";
protected static final String DROP_TOMBSTONE = "drop.tombstones";
protected static final String ADD_HEADERS = "add.headers";
protected static final String ADD_FIELDS = "add.fields";
protected static final String ADD_FIELDS_PREFIX = ADD_FIELDS + ".prefix";
protected static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix";
protected static final String ARRAY_ENCODING = "array.encoding";
protected ExtractNewDocumentState<SourceRecord> transformation;
protected abstract String getCollectionName();
@ -120,4 +138,82 @@ SourceRecord getNextRecord() throws InterruptedException {
protected SourceRecord getUpdateRecord() throws InterruptedException {
return getRecordByOperation(Envelope.Operation.UPDATE);
}
// for ExtractNewDocumentStateTestIT
protected SourceRecords createCreateRecordFromJson(String pathOnClasspath) throws Exception {
final List<Document> documents = loadTestDocuments(pathOnClasspath);
try (var client = connect()) {
for (Document document : documents) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(document);
}
}
final SourceRecords records = consumeRecordsByTopic(documents.size());
assertThat(records.recordsForTopic(topicName()).size()).isEqualTo(documents.size());
assertNoRecordsToConsume();
return records;
}
protected SourceRecord createCreateRecord() throws Exception {
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("struct", "Morris Park Ave")
.append("zipcode", "10462"));
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(obj);
}
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
return records.allRecordsInOrder().get(0);
}
protected SourceRecords createDeleteRecordWithTombstone() throws Exception {
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("struct", "Morris Park Ave")
.append("zipcode", "10462"));
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(obj);
}
final SourceRecords createRecords = consumeRecordsByTopic(1);
assertThat(createRecords.recordsForTopic(topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
try (var client = connect()) {
Document filter = Document.parse("{\"_id\": {\"$oid\": \"" + objId + "\"}}");
client.getDatabase(DB_NAME).getCollection(getCollectionName()).deleteOne(filter);
}
final SourceRecords deleteRecords = consumeRecordsByTopic(2);
assertThat(deleteRecords.recordsForTopic(topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
return deleteRecords;
}
protected static void waitForStreamingRunning() throws InterruptedException {
waitForStreamingRunning("mongodb", SERVER_NAME);
}
protected String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) {
Iterator<Header> headers = record.headers().allWithName(headerKey);
if (!headers.hasNext()) {
return null;
}
Object value = headers.next().value();
return value != null ? value.toString() : null;
}
}

View File

@ -9,6 +9,7 @@
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static io.debezium.junit.SkipWhenKafkaVersion.KafkaVersion.KAFKA_241;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThrows;
import java.util.Collections;
import java.util.HashMap;
@ -21,7 +22,6 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import io.debezium.connector.AbstractSourceInfo;
@ -38,14 +38,11 @@ public class ExtractNewDocumentStateTest {
private static final String SERVER_NAME = "serverX";
private ExtractNewDocumentState<SourceRecord> transformation;
protected ExtractNewDocumentState<SourceRecord> transformation;
@Rule
public TestRule skipTestRule = new SkipTestRule();
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
@Before
public void setup() {
transformation = new ExtractNewDocumentState<>();
@ -211,12 +208,8 @@ public void shouldFailWhenTheSchemaLooksValidButDoesNotHaveTheCorrectFieldsPreKa
valueSchema,
value);
exceptionRule.expect(NullPointerException.class);
// when
SourceRecord transformed = transformation.apply(eventRecord);
assertThat(transformed).isNull();
assertThrows(NullPointerException.class, () -> transformation.apply(eventRecord));
}
@Test
@ -246,11 +239,7 @@ public void shouldFailWhenTheSchemaLooksValidButDoesNotHaveTheCorrectFieldsPostK
valueSchema,
value);
exceptionRule.expect(IllegalArgumentException.class);
// when
SourceRecord transformed = transformation.apply(eventRecord);
assertThat(transformed).isNull();
assertThrows(IllegalArgumentException.class, () -> transformation.apply(eventRecord));
}
}

View File

@ -60,17 +60,6 @@ public class ExtractNewDocumentStateTestIT extends AbstractExtractNewDocumentSta
// todo: Verify that shard logic added in c84a8a28b472b7c169a8b265905a64db0dbb69e3 doesn't apply now.
private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
private static final String HANDLE_DELETES = "delete.handling.mode";
private static final String FLATTEN_STRUCT = "flatten.struct";
private static final String DELIMITER = "flatten.struct.delimiter";
private static final String DROP_TOMBSTONE = "drop.tombstones";
private static final String ADD_HEADERS = "add.headers";
private static final String ADD_FIELDS = "add.fields";
private static final String ADD_FIELDS_PREFIX = ADD_FIELDS + ".prefix";
private static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix";
private static final String ARRAY_ENCODING = "array.encoding";
@Override
protected String getCollectionName() {
return "functional";
@ -2035,81 +2024,4 @@ public void testConnectorAndTransformAvroFieldNameAdjustment() throws Interrupte
final var transformedFiledNames = transformedStruct.schema().fields().stream().map(Field::name).collect(Collectors.toList());
assertThat(transformedFiledNames).containsOnly("_id", "api_version");
}
private SourceRecords createCreateRecordFromJson(String pathOnClasspath) throws Exception {
final List<Document> documents = loadTestDocuments(pathOnClasspath);
try (var client = connect()) {
for (Document document : documents) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(document);
}
}
final SourceRecords records = consumeRecordsByTopic(documents.size());
assertThat(records.recordsForTopic(topicName()).size()).isEqualTo(documents.size());
assertNoRecordsToConsume();
return records;
}
private SourceRecord createCreateRecord() throws Exception {
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("struct", "Morris Park Ave")
.append("zipcode", "10462"));
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(obj);
}
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
return records.allRecordsInOrder().get(0);
}
private SourceRecords createDeleteRecordWithTombstone() throws Exception {
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("struct", "Morris Park Ave")
.append("zipcode", "10462"));
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(obj);
}
final SourceRecords createRecords = consumeRecordsByTopic(1);
assertThat(createRecords.recordsForTopic(topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
try (var client = connect()) {
Document filter = Document.parse("{\"_id\": {\"$oid\": \"" + objId + "\"}}");
client.getDatabase(DB_NAME).getCollection(getCollectionName()).deleteOne(filter);
}
final SourceRecords deleteRecords = consumeRecordsByTopic(2);
assertThat(deleteRecords.recordsForTopic(topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
return deleteRecords;
}
private static void waitForStreamingRunning() throws InterruptedException {
waitForStreamingRunning("mongodb", SERVER_NAME);
}
private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) {
Iterator<Header> headers = record.headers().allWithName(headerKey);
if (!headers.hasNext()) {
return null;
}
Object value = headers.next().value();
return value != null ? value.toString() : null;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.transforms;
import org.junit.Before;
import io.debezium.transforms.extractnewstate.DefaultDeleteHandlingStrategy;
import io.debezium.util.Collect;
/**
* Unit test for {@link ExtractNewDocumentState} and {@link DefaultDeleteHandlingStrategy}.
*
* @author Harvey Yue
*/
public class NewExtractNewDocumentStateTest extends ExtractNewDocumentStateTest {
@Before
public void setup() {
transformation = new ExtractNewDocumentState<>();
transformation.configure(Collect.hashMapOf(
"array.encoding", "array",
"delete.tombstone.handling.mode", "tombstone"));
}
}

View File

@ -0,0 +1,2030 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.transforms;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.types.ObjectId;
import org.junit.Test;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.transforms.extractnewstate.DefaultDeleteHandlingStrategy;
import io.debezium.util.Collect;
/**
* Integration test for {@link ExtractNewDocumentState} and {@link DefaultDeleteHandlingStrategy}. It sends operations into
* MongoDB and listens on messages that are generated by Debezium plug-in. The messages
* are then run through the SMT itself.
*
* @author Harvey Yue
*/
public class NewExtractNewDocumentStateTestIT extends AbstractExtractNewDocumentStateTestIT {
@Override
protected String getCollectionName() {
return "functional";
}
@Test
@FixFor("DBZ-563")
public void shouldDropTombstoneByDefault() throws InterruptedException {
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "drop"));
// First insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
// Test Delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
}
// First delete record to arrive is coming from the oplog
SourceRecord firstRecord = getRecordByOperation(Envelope.Operation.DELETE);
final SourceRecord transformedDelete = transformation.apply(firstRecord);
assertThat(transformedDelete).isNull();
// Second record is the tombstone
SourceRecord tombstoneRecord = getNextRecord();
assertThat(tombstoneRecord).isNotNull();
// Test tombstone record is dropped
final SourceRecord transformedTombstone = transformation.apply(tombstoneRecord);
assertThat(transformedTombstone).isNull();
}
@Test
public void shouldTransformEvents() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
// transformationConfig.put(CONFIG_DROP_TOMBSTONES, "false");
transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(transformationConfig);
// Test insert
try (var client = connect()) {
long timestamp = ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")).toEpochSecond();
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse(
"{"
+ " '_id': 1, "
+ " 'dataStr': 'hello', "
+ " 'dataInt': 123, "
+ " 'dataLong': 80000000000, "
+ " 'dataDate': ISODate(\"2020-01-27T10:47:12.311Z\"), "
+ " 'dataTimestamp': Timestamp(" + timestamp + ", 1)" // seconds since epoch, operation counter within second
+ "}"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedInsert = transformation.apply(insertRecord);
final Struct transformedInsertValue = (Struct) transformedInsert.value();
assertThat(transformedInsert.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedInsert.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedInsert.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedInsert.valueSchema().field("dataLong").schema()).isEqualTo(Schema.OPTIONAL_INT64_SCHEMA);
assertThat(transformedInsertValue.get("_id")).isEqualTo(1);
assertThat(transformedInsertValue.get("dataStr")).isEqualTo("hello");
assertThat(transformedInsertValue.get("dataInt")).isEqualTo(123);
assertThat(transformedInsertValue.get("dataLong")).isEqualTo(80_000_000_000l);
assertThat(transformedInsertValue.get("dataDate")).isEqualTo(Date.from(Instant.from(ZonedDateTime.of(2020, 1, 27, 10, 47, 12, 311000000, ZoneId.of("UTC")))));
assertThat(transformedInsertValue.get("dataTimestamp")).isEqualTo(Date.from(Instant.from(ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")))));
// Test update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
RawBsonDocument.parse("{'$set': {'dataStr': 'bye'}}"));
}
records = consumeRecordsByTopic(1);
final SourceRecord candidateUpdateRecord = records.recordsForTopic(this.topicName()).get(0);
if (((Struct) candidateUpdateRecord.value()).get("op").equals("c")) {
// MongoDB is not providing really consistent snapshot, so the initial insert
// can arrive both in initial sync snapshot and in oplog
records = consumeRecordsByTopic(1);
}
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord updateRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedUpdate = transformation.apply(updateRecord);
final Struct transformedUpdateValue = (Struct) transformedUpdate.value();
assertThat(transformedUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdate.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedUpdateValue.get("dataStr")).isEqualTo("bye");
// Test Update Multiple Fields
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
RawBsonDocument.parse("{'$set': {'newStr': 'hello', 'dataInt': 456}}"));
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord updateMultipleRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedMultipleUpdate = transformation.apply(updateMultipleRecord);
final Struct transformedMultipleUpdateValue = (Struct) transformedMultipleUpdate.value();
assertThat(transformedMultipleUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedMultipleUpdate.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedMultipleUpdate.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedMultipleUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedMultipleUpdateValue.get("newStr")).isEqualTo("hello");
assertThat(transformedMultipleUpdateValue.get("dataInt")).isEqualTo(456);
// Test Update with $unset operation
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
RawBsonDocument.parse("{'$unset': {'newStr': ''}}"));
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord updateUnsetRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedUnsetUpdate = transformation.apply(updateUnsetRecord);
final Struct transformedUnsetUpdateValue = (Struct) transformedUnsetUpdate.value();
assertThat(transformedUnsetUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUnsetUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedUnsetUpdateValue.schema().field("newStr")).isNull();
// Test FullUpdate
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
RawBsonDocument.parse("{'dataStr': 'Hi again'}"));
}
records = consumeRecordsByTopic(1);
final SourceRecord candidateFullUpdateRecord = records.recordsForTopic(this.topicName()).get(0);
if (((Struct) candidateFullUpdateRecord.value()).get("op").equals("c")) {
// MongoDB is not providing really consistent snapshot, so the initial insert
// can arrive both in initial sync snapshot and in oplog
records = consumeRecordsByTopic(1);
}
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord FullUpdateRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedFullUpdate = transformation.apply(FullUpdateRecord);
final Struct transformedFullUpdateValue = (Struct) transformedFullUpdate.value();
assertThat(transformedFullUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedFullUpdate.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedFullUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedFullUpdateValue.get("dataStr")).isEqualTo("Hi again");
// Test Delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
// Test mongo Deletion operation
final SourceRecord deleteRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedDelete = transformation.apply(deleteRecord);
final Struct transformedDeleteValue = (Struct) transformedDelete.value();
assertThat(transformedDeleteValue).isNull();
// Test tombstone record
final SourceRecord tombstoneRecord = records.recordsForTopic(this.topicName()).get(1);
final SourceRecord transformedTombstone = transformation.apply(tombstoneRecord);
// assertThat(transformedTombstone.value()).isNull();
assertThat(transformedTombstone).isNull();
// Assert deletion preserves key
// assertThat(SchemaUtil.asString(transformedDelete.keySchema())).isEqualTo(SchemaUtil.asString(transformedTombstone.keySchema()));
// assertThat(transformedDelete.key().toString()).isEqualTo(transformedTombstone.key().toString());
}
@Test
@FixFor("DBZ-1767")
public void shouldSupportDbRef() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformationConfig.put("array.encoding", "array");
transformationConfig.put("operation.header", "true");
transformationConfig.put("field.name.adjustment.mode", "avro");
transformation.configure(transformationConfig);
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
validate(transformed);
final Struct value = ((Struct) transformed.value()).getStruct("data");
assertThat(value.getString("_ref")).isEqualTo("a2");
assertThat(value.getInt32("_id")).isEqualTo(4);
assertThat(value.getString("_db")).isEqualTo("b2");
}
@Test
@FixFor("DBZ-2680")
public void shouldSupportSubSanitizeFieldName() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformationConfig.put("array.encoding", "array");
transformationConfig.put("operation.header", "true");
transformationConfig.put("field.name.adjustment.mode", "avro");
transformation.configure(transformationConfig);
final String doc = "{" +
" \"_id\": \"222\"," +
" \"metrics\": {" +
" \"metric::fct\": {" +
" \"min\": 0," +
" \"max\": 1," +
" }," +
" }" +
"}";
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse(doc));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
validate(transformed);
final Struct metric = ((Struct) transformed.value()).getStruct("metrics").getStruct("metric__fct");
assertThat(metric.getInt32("min")).isEqualTo(0);
assertThat(metric.getInt32("max")).isEqualTo(1);
}
@Test
@FixFor("DBZ-1442")
public void shouldAddFields() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
props.put(ADD_FIELDS, "ord , db,rs");
transformation.configure(props);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{ '_id' : 3, 'name' : 'Tim' }"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{'_id' : 3}"),
RawBsonDocument.parse("{'$set': {'name': 'Sally'}}"));
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
// assert source fields' values
final Struct value = (Struct) transformed.value();
assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
assertThat(value.get("__db")).isEqualTo(source.getString("db"));
assertThat(value.get("__rs")).isEqualTo(source.getString("rs"));
assertThat(value.get("__db")).isEqualTo(DB_NAME);
assertThat(value.get("__rs")).isEqualTo("rs0");
}
@Test
@FixFor("DBZ-1442")
public void shouldAddFieldsForRewriteDeleteEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "ord,db,rs");
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
transformation.configure(props);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{ '_id' : 4, 'name' : 'Sally' }"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : 4 }"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
// assert source fields' values
final Struct value = (Struct) transformed.value();
assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
assertThat(value.get("__db")).isEqualTo(source.getString("db"));
assertThat(value.get("__rs")).isEqualTo(source.getString("rs"));
assertThat(value.get("__db")).isEqualTo(DB_NAME);
assertThat(value.get("__rs")).isEqualTo("rs0");
}
@Test
public void shouldTransformRecordForInsertEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
props.put(ADD_HEADERS, "op");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document().append("_id", objId)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6));
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
// then assert operation header is insert
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.CREATE.code());
// acquire key and value Structs
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema().name()).isEqualTo(SERVER_NAME + "." + DB_NAME + "." + getCollectionName());
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("phone")).isEqualTo(123L);
assertThat(value.get("active")).isEqualTo(true);
assertThat(value.get("scores")).isEqualTo(Arrays.asList(1.2, 3.4, 5.6));
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT64_SCHEMA);
assertThat(value.schema().field("active").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
assertThat(value.schema().field("scores").schema()).isEqualTo(SchemaBuilder.array(SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA).optional().build());
assertThat(value.schema().fields()).hasSize(5);
}
@Test
public void shouldTransformRecordForInsertEventWithComplexIdType() throws InterruptedException {
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
Document obj = new Document()
.append("_id", new Document()
.append("company", 32)
.append("dept", "home improvement"))
.append("name", "Sally");
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema().field("company").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(key.schema().field("id").schema().field("dept").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(((Struct) key.get("id")).get("company")).isEqualTo(32);
assertThat(((Struct) key.get("id")).get("dept")).isEqualTo("home improvement");
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(((Struct) value.get("_id")).get("company")).isEqualTo(32);
assertThat(((Struct) value.get("_id")).get("dept")).isEqualTo("home improvement");
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.schema().field("_id").schema().field("company").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.schema().field("_id").schema().field("dept").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(2);
}
@Test
public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
props.put(ADD_HEADERS, "op");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Tim");
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document().append("$set", new Document("name", "Sally"));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
// then assert operation header is update
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.UPDATE.code());
// acquire key and value Structs
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
public void shouldGenerateRecordForPartialUpdateEvent() throws InterruptedException {
Configuration config = getBaseConfigBuilder()
.with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_WITH_PRE_IMAGE)
.build();
restartConnectorWithConfig(config);
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Tim")
.append("phone", 123L)
.append("active", false);
// insert
try (var client = connect()) {
MongoDatabase db1 = client.getDatabase(DB_NAME);
CreateCollectionOptions options = new CreateCollectionOptions();
options.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
db1.createCollection(this.getCollectionName(), options);
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document()
.append("$set", new Document("name", "Sally"))
// the value of "$unset" doesn't matter, and they'll all be unset.
// https://www.mongodb.com/docs/manual/reference/operator/update/unset/#mongodb-update-up.-unset
.append("$unset", new Document().append("phone", true).append("active", false));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
Struct value = (Struct) transformed.value();
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().field("active")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
@Test
public void shouldGenerateRecordForSetOnlyPartialUpdateEvent() throws InterruptedException {
Configuration config = getBaseConfigBuilder()
.with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS)
.build();
restartConnectorWithConfig(config);
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Tim")
.append("phone", 123L)
.append("active", false);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document()
.append("$set", new Document("name", "Sally"));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
Struct value = (Struct) transformed.value();
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
// no pre-image, so these 2 fields shouldn't be visible
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().field("active")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@FixFor("DBZ-612")
public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Tim")
.append("phone", 123L)
.append("active", false);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document()
.append("$set", new Document("name", "Sally"))
.append("$unset", new Document().append("phone", true).append("active", false));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
Struct value = (Struct) transformed.value();
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@FixFor("DBZ-612")
public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("phone", 123L)
.append("active", false);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document()
.append("$unset", new Document().append("phone", true).append("active", false));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
Struct value = (Struct) transformed.value();
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@FixFor("DBZ-582")
public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {
// todo: we might want to rework how the base class handles connector bootstrap to avoid needing to
// stop the connector and restart it with the appropriate configuration here
restartConnectorWithoutEmittingTombstones();
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Extract values from SourceRecord
final SourceRecord record = records.allRecordsInOrder().get(0);
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
validate(transformed);
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
assertThat(value).isNull();
}
@Test
@FixFor("DBZ-1032")
public void shouldGenerateRecordHeaderForTombstone() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op");
// props.put(DROP_TOMBSTONE, "false");
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite-with-tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(1));
validate(transformed);
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// then assert operation header is delete
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
assertThat(value).isNull();
}
@Test
@FixFor("DBZ-583")
public void shouldDropDeleteMessagesByDefault() throws InterruptedException {
restartConnectorWithoutEmittingTombstones();
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "drop"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
// then assert transformed message is skipped
assertThat(transformed).isNull();
}
@Test
@FixFor("DBZ-583")
public void shouldRewriteDeleteMessage() throws InterruptedException {
restartConnectorWithoutEmittingTombstones();
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
assertThat(value.get("__deleted")).isEqualTo(true);
}
@Test
@FixFor("DBZ-583")
public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Tim");
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document().append("$set", new Document("name", "Sally"));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct value = (Struct) transformed.value();
// then assert value and its schema
assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
assertThat(value.get("__deleted")).isEqualTo(false);
}
@Test
public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
// then assert operation header is delete
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
assertThat((operationHeader).hasNext()).isTrue();
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
// acquire key and value Structs
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
assertThat(value).isNull();
}
@Test
@FixFor("DBZ-971")
public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "drop"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Tim");
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
Document updateObj = new Document().append("$set", new Document("name", "Sally"));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
final SourceRecord record = records.allRecordsInOrder().get(0);
record.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");
// Perform transformation
final SourceRecord transformed = transformation.apply(record);
assertThat(transformed.headers()).hasSize(1);
Iterator<Header> headers = transformed.headers().allWithName("application/debezium-test-header");
assertThat(headers.hasNext()).isTrue();
assertThat(headers.next().value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders");
}
@Test
public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException {
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("street", "Morris Park Ave")
.append("zipcode", "10462"));
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("address")).isEqualTo(new Struct(value.schema().field("address").schema())
.put("street", "Morris Park Ave").put("zipcode", "10462"));
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address").schema()).isEqualTo(
SchemaBuilder.struct()
.name(SERVER_NAME + "." + DB_NAME + "." + getCollectionName() + ".address")
.optional()
.field("street", Schema.OPTIONAL_STRING_SCHEMA)
.field("zipcode", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertThat(value.schema().fields()).hasSize(3);
}
@Test
public void shouldFlattenTransformRecordForInsertEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(FLATTEN_STRUCT, "true");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("street", "Morris Park Ave")
.append("zipcode", "10462"));
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("address_street")).isEqualTo("Morris Park Ave");
assertThat(value.get("address_zipcode")).isEqualTo("10462");
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address_street").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address_zipcode").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(4);
}
@Test
public void shouldFlattenWithDelimiterTransformRecordForInsertEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(FLATTEN_STRUCT, "true");
props.put(DELIMITER, "-");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("street", "Morris Park Ave")
.append("zipcode", "10462"));
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("address-street")).isEqualTo("Morris Park Ave");
assertThat(value.get("address-zipcode")).isEqualTo("10462");
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-street").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-zipcode").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(4);
}
@Test
public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(FLATTEN_STRUCT, "true");
props.put(DELIMITER, "-");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("street", "Morris Park Ave")
.append("zipcode", "10462"));
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// update
Document updateObj = new Document()
.append("$set", new Document(Collect.hashMapOf(
"address.city", "Canberra",
"address.name", "James",
"address.city2.part", 3)));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("address-city")).isEqualTo("Canberra");
assertThat(value.get("address-name")).isEqualTo("James");
assertThat(value.get("address-city2-part")).isEqualTo(3);
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-city").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-city2-part").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.schema().fields()).hasSize(7);
}
@Test
@FixFor("DBZ-1791")
public void testAddHeader() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(transformed.headers()).hasSize(1);
assertThat(getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo(Envelope.Operation.CREATE.code());
}
@Test
@FixFor("DBZ-1791")
public void testAddHeadersForMissingOrInvalidFields() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,id");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(transformed.headers()).hasSize(2);
assertThat(getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(getSourceRecordHeaderByKey(transformed, "__id")).isNull();
}
@Test
@FixFor({ "DBZ-1791", "DBZ-2504" })
public void testAddHeadersSpecifyingStruct() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,source.rs,source.collection");
props.put(ADD_HEADERS_PREFIX, "prefix.");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(transformed.headers()).hasSize(3);
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.source_rs")).isEqualTo("rs0");
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.source_collection")).isEqualTo(getCollectionName());
}
@Test
@FixFor("DBZ-1791")
public void testAddField() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
}
@Test
@FixFor({ "DBZ-2606", "DBZ-6773" })
public void testNewFieldAndHeaderMapping() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
String fieldPrefix = "";
String headerPrefix = "prefix.";
props.put(ADD_FIELDS, "op:OP");
props.put(ADD_FIELDS_PREFIX, fieldPrefix);
props.put(ADD_HEADERS, "op:OPERATION");
props.put(ADD_HEADERS_PREFIX, headerPrefix);
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(((Struct) transformed.value()).get(fieldPrefix + "OP")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(transformed.headers()).hasSize(1);
assertThat(getSourceRecordHeaderByKey(transformed, headerPrefix + "OPERATION")).isEqualTo(Envelope.Operation.CREATE.code());
}
@Test
@FixFor({ "DBZ-1791", "DBZ-2504" })
public void testAddFields() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op , ts_ms");
props.put(ADD_FIELDS_PREFIX, "prefix.");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(((Struct) transformed.value()).get("prefix.op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(((Struct) transformed.value()).get("prefix.ts_ms")).isNotNull();
}
@Test
@FixFor("DBZ-1791")
public void testAddFieldsForMissingOptionalField() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,id");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(((Struct) transformed.value()).get("__id")).isNull();
}
@Test
@FixFor("DBZ-1791")
public void testAddFieldsSpecifyStruct() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,source.rs,source.collection");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord transformed = transformation.apply(createRecord);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(((Struct) transformed.value()).get("__source_rs")).isEqualTo("rs0");
assertThat(((Struct) transformed.value()).get("__source_collection")).isEqualTo(getCollectionName());
}
@Test
@FixFor("DBZ-1791")
public void testAddFieldHandleDeleteRewrite() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op");
transformation.configure(props);
final SourceRecord deleteRecord = createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
final SourceRecord transformed = transformation.apply(deleteRecord);
assertThat(((Struct) transformed.value()).get("__deleted")).isEqualTo(true);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
}
@Test
@FixFor("DBZ-1791")
public void tesAddFieldsHandleDeleteRewrite() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op,ts_ms");
transformation.configure(props);
final SourceRecord deleteRecord = createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
final SourceRecord transformed = transformation.apply(deleteRecord);
assertThat(((Struct) transformed.value()).get("__deleted")).isEqualTo(true);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
assertThat(((Struct) transformed.value()).get("__ts_ms")).isNotNull();
}
@Test
@FixFor("DBZ-1791")
public void testAddFieldsSpecifyStructHandleDeleteRewrite() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
props.put(ADD_FIELDS, "op,source.rs,source.collection");
transformation.configure(props);
final SourceRecord deleteRecord = createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
final SourceRecord transformed = transformation.apply(deleteRecord);
assertThat(((Struct) transformed.value()).get("__deleted")).isEqualTo(true);
assertThat(((Struct) transformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
assertThat(((Struct) transformed.value()).get("__source_rs")).isEqualTo("rs0");
assertThat(((Struct) transformed.value()).get("__source_collection")).isEqualTo(getCollectionName());
}
@Test
@FixFor("DBZ-1791")
public void testAddFieldsHandleDeleteRewriteAndTombstone() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite-with-tombstone");
props.put(ADD_FIELDS, "op,ts_ms");
transformation.configure(props);
final SourceRecords records = createDeleteRecordWithTombstone();
final SourceRecord deleteRecord = records.allRecordsInOrder().get(0);
final SourceRecord deleteTransformed = transformation.apply(deleteRecord);
assertThat(((Struct) deleteTransformed.value()).get("__deleted")).isEqualTo(true);
assertThat(((Struct) deleteTransformed.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
assertThat(((Struct) deleteTransformed.value()).get("__ts_ms")).isNotNull();
final SourceRecord tombstoneRecord = records.allRecordsInOrder().get(1);
final SourceRecord tombstoneTransformed = transformation.apply(tombstoneRecord);
assertThat(tombstoneTransformed.value()).isNull();
}
@Test
@FixFor("DBZ-2585")
public void testEmptyArray() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put("array.encoding", "array");
transformationConfig.put("field.name.adjustment.mode", "avro");
transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(transformationConfig);
// Test insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{'empty_array': [] }"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedInsert = transformation.apply(insertRecord);
assertThat(transformedInsert.valueSchema().field("empty_array")).isNull();
VerifyRecord.isValid(transformedInsert);
}
@Test
@FixFor("DBZ-2455")
public void testAddUpdatedFieldAfterUpdate() throws Exception {
waitForStreamingRunning();
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("a", 1)
.append("b", 2)
.append("c", 3);
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// update
Document updateObj = new Document().append("$set", new Document(Collect.hashMapOf("a", 22)));
// update
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "updateDescription.updatedFields");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("a")).isEqualTo(22);
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("a").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
// 4 data fields + 1 __updateDescription_updatedFields
assertThat(value.schema().fields()).hasSize(4 + 1);
assertThat(value.schema().field("__updateDescription_updatedFields").schema()).isEqualTo(io.debezium.data.Json.builder().optional().build());
assertThat(value.get("__updateDescription_updatedFields")).isEqualTo("{\"a\": 22}");
assertThat(value.get("b")).isEqualTo(2);
assertThat(value.schema().field("b").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.get("c")).isEqualTo(3);
assertThat(value.schema().field("c").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
}
@Test(expected = DataException.class)
@FixFor("DBZ-2316")
public void testShouldThrowExceptionWithElementsDifferingStructures() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ARRAY_ENCODING, "array");
props.put(ADD_FIELDS, "op,source.ts_ms");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
final SourceRecords records = createCreateRecordFromJson("dbz-2316.json");
for (SourceRecord record : records.allRecordsInOrder()) {
transformation.apply(record);
}
}
@Test
@FixFor("DBZ-2569")
public void testMatrixType() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put("array.encoding", "array");
transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(transformationConfig);
// Test insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse(
"{"
+ " 'matrix': ["
+ " [1,2,3],"
+ " [4,5,6],"
+ " [7,8,9],"
+ " ]"
+ " ,'array_complex': ["
+ " {'k1' : 'v1','k2' : 1},"
+ " {'k1' : 'v2','k2' : 2},"
+ " ]"
+ " ,'matrix_complex': ["
+ " ["
+ " {'k3' : 'v111',"
+ " 'k4' : [1,2,3]},"
+ " {'k3' : 'v211',"
+ " 'k4' : [4,5,6]}"
+ " ],"
+ " ["
+ " {'k3' : 'v112',"
+ " 'k4' : [7,8]},"
+ " {'k3' : 'v212',"
+ " 'k4' : [8]}"
+ " ],"
+ " ]"
+ "}"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedInsert = transformation.apply(insertRecord);
final Struct transformedInsertValue = (Struct) transformedInsert.value();
final Schema matrixSchema = transformedInsert.valueSchema().field("matrix").schema();
assertThat(matrixSchema.type()).isEqualTo(Schema.Type.ARRAY);
final Schema subMatrixSchema = matrixSchema.valueSchema().schema();
assertThat(subMatrixSchema.type()).isEqualTo(Schema.Type.ARRAY);
assertThat(subMatrixSchema.valueSchema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedInsertValue.get("matrix")).isEqualTo(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9)));
final Schema arrayComplexSchema = transformedInsert.valueSchema().field("array_complex").schema();
assertThat(arrayComplexSchema.type()).isEqualTo(Schema.Type.ARRAY);
final Schema subArrayComplexSchema = arrayComplexSchema.valueSchema().schema();
assertThat(subArrayComplexSchema.type()).isEqualTo(Schema.Type.STRUCT);
assertThat(subArrayComplexSchema.field("k1").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(subArrayComplexSchema.field("k2").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
final Field k1 = subArrayComplexSchema.field("k1");
final Field k2 = subArrayComplexSchema.field("k2");
final Struct subStruct1 = new Struct(subArrayComplexSchema);
subStruct1.put(k1, "v1");
subStruct1.put(k2, 1);
final Struct subStruct2 = new Struct(subArrayComplexSchema);
subStruct2.put(k1, "v2");
subStruct2.put(k2, 2);
assertThat(transformedInsertValue.get("array_complex")).isEqualTo(Arrays.asList(subStruct1, subStruct2));
final Schema matrixComplexSchema = transformedInsert.valueSchema().field("matrix_complex").schema();
assertThat(matrixComplexSchema.type()).isEqualTo(Schema.Type.ARRAY);
final Schema subMatrixComplexSchema = matrixComplexSchema.valueSchema().schema();
assertThat(subMatrixComplexSchema.type()).isEqualTo(Schema.Type.ARRAY);
Schema strucSchema = subMatrixComplexSchema.valueSchema();
assertThat(strucSchema.schema().type()).isEqualTo(Schema.Type.STRUCT);
assertThat(strucSchema.field("k3").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(strucSchema.field("k4").schema().type()).isEqualTo(Schema.Type.ARRAY);
assertThat(strucSchema.field("k4").schema().valueSchema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
final Field k3 = strucSchema.field("k3");
final Field k4 = strucSchema.field("k4");
final Struct subStruct11 = new Struct(strucSchema.schema());
subStruct11.put(k3, "v111");
subStruct11.put(k4, Arrays.asList(1, 2, 3));
final Struct subStruct12 = new Struct(strucSchema.schema());
subStruct12.put(k3, "v112");
subStruct12.put(k4, Arrays.asList(7, 8));
final Struct subStruct21 = new Struct(strucSchema.schema());
subStruct21.put(k3, "v211");
subStruct21.put(k4, Arrays.asList(4, 5, 6));
final Struct subStruct22 = new Struct(strucSchema.schema());
subStruct22.put(k3, "v212");
subStruct22.put(k4, Arrays.asList(8));
assertThat(transformedInsertValue.get("matrix_complex"))
.isEqualTo(Arrays.asList(Arrays.asList(subStruct11, subStruct21), Arrays.asList(subStruct12, subStruct22)));
}
@Test
@FixFor("DBZ-2569")
public void testMatrixArrayAsDocumentType() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put("array.encoding", "document");
transformationConfig.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(transformationConfig);
// Test insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse(
"{"
+ " 'matrix': ["
+ " [1,'aa',3],"
+ " [4,5,'6'],"
+ " [7.0,8],"
+ " ]"
+ "}"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
final SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
final SourceRecord transformedInsert = transformation.apply(insertRecord);
final Schema matrixSchema = transformedInsert.valueSchema().field("matrix").schema();
assertThat(matrixSchema.type()).isEqualTo(Schema.Type.STRUCT);
assertThat(matrixSchema.fields().size()).isEqualTo(3);
final Schema firstSubSchema = matrixSchema.field("_0").schema();
assertThat(firstSubSchema.type()).isEqualTo(Schema.Type.STRUCT);
assertThat(firstSubSchema.fields().size()).isEqualTo(3);
assertThat(firstSubSchema.field("_0").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(firstSubSchema.field("_1").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(firstSubSchema.field("_2").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
final Schema secondSubSchema = matrixSchema.field("_1").schema();
assertThat(secondSubSchema.type()).isEqualTo(Schema.Type.STRUCT);
assertThat(secondSubSchema.fields().size()).isEqualTo(3);
assertThat(secondSubSchema.field("_0").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(secondSubSchema.field("_1").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(secondSubSchema.field("_2").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
final Schema thirdSubSchema = matrixSchema.field("_2").schema();
assertThat(thirdSubSchema.type()).isEqualTo(Schema.Type.STRUCT);
assertThat(thirdSubSchema.fields().size()).isEqualTo(2);
assertThat(thirdSubSchema.field("_0").schema()).isEqualTo(Schema.OPTIONAL_FLOAT64_SCHEMA);
assertThat(thirdSubSchema.field("_1").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
final Struct transformedInsertValue = (Struct) transformedInsert.value();
final Struct firstSubStruct = new Struct(firstSubSchema);
firstSubStruct.put(firstSubSchema.field("_0"), 1);
firstSubStruct.put(firstSubSchema.field("_1"), "aa");
firstSubStruct.put(firstSubSchema.field("_2"), 3);
final Struct secondSubStruct = new Struct(secondSubSchema);
secondSubStruct.put(secondSubSchema.field("_0"), 4);
secondSubStruct.put(secondSubSchema.field("_1"), 5);
secondSubStruct.put(secondSubSchema.field("_2"), "6");
final Struct thirdSubStruct = new Struct(thirdSubSchema);
thirdSubStruct.put(thirdSubSchema.field("_0"), 7.0);
thirdSubStruct.put(thirdSubSchema.field("_1"), 8);
final Struct struct = new Struct(matrixSchema);
struct.put(matrixSchema.field("_0"), firstSubStruct);
struct.put(matrixSchema.field("_1"), secondSubStruct);
struct.put(matrixSchema.field("_2"), thirdSubStruct);
assertThat(transformedInsertValue.get("matrix")).isEqualTo(struct);
}
@Test
@FixFor("DBZ-5434")
public void shouldSupportNestedArrays() throws InterruptedException {
waitForStreamingRunning();
transformation.configure(Collect.hashMapOf(HANDLE_TOMBSTONE_DELETES, "tombstone"));
// Test insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertOne(Document.parse("{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6c\"),\"f1\":{\"f2\":[{\"f3\":{}},{\"f3\":{\"f5\":5}}]}}"));
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
SourceRecord insertRecord = records.recordsForTopic(this.topicName()).get(0);
SourceRecord transformedInsert = transformation.apply(insertRecord);
Struct transformedInsertValue = (Struct) transformedInsert.value();
Schema transformedInsertSchema = transformedInsert.valueSchema();
transformedInsertSchema.field("f1").schema().field("f2");
assertThat(transformedInsertSchema.field("f1").schema()
.field("f2").schema().valueSchema()
.field("f3").schema()
.field("f5").schema().type()).isEqualTo(Schema.INT32_SCHEMA.type());
assertThat(transformedInsertValue.getStruct("f1").getArray("f2").size()).isEqualTo(2);
// Test delete
try (var client = connect()) {
client.getDatabase(DB_NAME)
.getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{'_id' : ObjectId('6182b1a25711ed59dd6a1d6c')}"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
// Test insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.insertMany(Collect.arrayListOf(
"{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6c\"),\"f1\":{\"f2\":[{\"f3\":[]},{\"f3\":[{\"f5\":5}]}]}}",
"{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6d\"),\"f1\":{\"f2\":[{\"f3\":[]},{\"f3\":[]}]}}")
.stream().map(Document::parse).collect(Collectors.toList()));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
List<SourceRecord> transformedInserts = records.allRecordsInOrder().stream().map(m -> transformation.apply(m))
.collect(Collectors.toList());
transformedInsertValue = (Struct) transformedInserts.get(0).value();
assertThat(transformedInsertValue.getStruct("f1").getArray("f2").size()).isEqualTo(2);
transformedInsertValue = (Struct) transformedInserts.get(1).value();
List<Struct> f2 = transformedInsertValue.getStruct("f1").getArray("f2");
assertThat(f2.size()).isEqualTo(2);
assertThat(f2.get(0).getArray("f3").size()).isEqualTo(0);
}
@Test
@FixFor({ "DBZ-5834", "DBZ-6774" })
public void shouldAddUpdateDescription() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "updateDescription.updatedFields,nonexistentField,version");
props.put(ADD_HEADERS_PREFIX, "prefix.");
props.put(HANDLE_TOMBSTONE_DELETES, "tombstone");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("name", "Sally")
.append("address", new Document()
.append("street", "Morris Park Ave")
.append("zipcode", "10462"));
// insert
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// update
Document updateObj = new Document()
.append("$set", new Document(Collect.hashMapOf(
"name", "Mary",
"zipcode", "11111")));
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
}
SourceRecords updateRecords = consumeRecordsByTopic(1);
assertThat(updateRecords.recordsForTopic(this.topicName()).size()).isEqualTo(1);
// do the transform
final SourceRecord transformed = transformation.apply(updateRecords.recordsForTopic(this.topicName()).get(0));
// verify headers
final String expectedUpdateFields = "{\"name\": \"Mary\", \"zipcode\": \"11111\"}";
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.updateDescription_updatedFields")).isEqualTo(expectedUpdateFields);
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.nonexistentField")).isNull();
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.version")).isEqualTo(Module.version());
}
@Test
@FixFor("DBZ-6725")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
public void shouldGenerateRecordForDeleteEventsDeleteHandlingRewrite() throws InterruptedException {
Configuration config = getBaseConfigBuilder()
.with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_WITH_PRE_IMAGE)
.build();
restartConnectorWithConfig(config);
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_TOMBSTONE_DELETES, "rewrite");
transformation.configure(props);
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("dataStr", "Hello");
// insert
try (var client = connect()) {
MongoDatabase db1 = client.getDatabase(DB_NAME);
CreateCollectionOptions options = new CreateCollectionOptions();
options.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
db1.createCollection(this.getCollectionName(), options);
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
}
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// delete
try (var client = connect()) {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"));
}
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
assertNoRecordsToConsume();
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// assert value and its schema
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("dataStr").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
assertThat(value.get("_id")).isEqualTo(objId.toString());
assertThat(value.get("dataStr")).isEqualTo("Hello");
assertThat(value.get("__deleted")).isEqualTo(true);
}
}

View File

@ -62,6 +62,7 @@ public R handleDeleteRecord(R record) {
if (oldRecord.value() instanceof Struct) {
return removedDelegate.apply(oldRecord);
}
return oldRecord;
default:
throw new DebeziumException("Unknown delete handling mode: " + deleteTombstoneHandling);
}