diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/JsonSerialization.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/JsonSerialization.java new file mode 100644 index 000000000..2291a5912 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/JsonSerialization.java @@ -0,0 +1,79 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import java.util.function.Function; + +import org.bson.Document; +import org.bson.codecs.Encoder; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.MongoClient; + +/** + * A class responsible for serialization of message keys and values to MongoDB compatible JSON + * + * @author Jiri Pechanec + * + */ +class JsonSerialization { + + @FunctionalInterface + public static interface Transformer extends Function { + } + + /** + * Common settings for writing JSON strings using a compact JSON format + */ + public static final JsonWriterSettings COMPACT_JSON_SETTINGS = JsonWriterSettings.builder() + .outputMode(JsonMode.STRICT) + .indent(true) + .indentCharacters("") + .newLineCharacters("") + .build(); + + /** + * Common settings for writing JSON strings using a compact JSON format + */ + private static final JsonWriterSettings SIMPLE_JSON_SETTINGS = JsonWriterSettings.builder() + .outputMode(JsonMode.RELAXED) + .indent(true) + .indentCharacters("") + .newLineCharacters("") + .build(); + + private final Transformer transformer; + + public JsonSerialization() { + final Encoder encoder = MongoClient.getDefaultCodecRegistry().get(Document.class); + transformer = (doc) -> doc.toJson(COMPACT_JSON_SETTINGS, encoder); + } + + public String getDocumentId(Document document) { + if (document == null) { + return null; + } + // The serialized value is in format {"_": xxx} so we need to remove the starting dummy field name and closing brace + final String keyValue = new BasicDBObject("_", document.get(DBCollection.ID_FIELD_NAME)).toJson(SIMPLE_JSON_SETTINGS); + final int start = 6; + final int end = keyValue.length() - 1; + if (!(end > start)) { + throw new IllegalStateException("Serialized JSON object '" + keyValue + "' is not in expected format"); + } + return keyValue.substring(start, end); + } + + public String getDocumentValue(Document document) { + return transformer.apply(document); + } + + public Transformer getTransformer() { + return transformer; + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java index 5d1980ee7..4838f75e9 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java @@ -7,22 +7,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; -import org.bson.Document; -import org.bson.codecs.Encoder; -import org.bson.json.JsonMode; -import org.bson.json.JsonWriterSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.DBCollection; -import com.mongodb.MongoClient; -import com.mongodb.util.JSONSerializers; -import com.mongodb.util.ObjectSerializer; - import io.debezium.annotation.ThreadSafe; import io.debezium.connector.mongodb.FieldSelector.FieldFilter; import io.debezium.data.Envelope; @@ -40,31 +30,19 @@ @ThreadSafe public class MongoDbSchema implements DatabaseSchema { - /** - * Common settings for writing JSON strings using a compact JSON format - */ - public static final JsonWriterSettings COMPACT_JSON_SETTINGS = JsonWriterSettings.builder() - .outputMode(JsonMode.STRICT) - .indent(true) - .indentCharacters("") - .newLineCharacters("") - .build(); - - private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict(); private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSchema.class); private final Filters filters; private final TopicSelector topicSelector; private final Schema sourceSchema; private final SchemaNameAdjuster adjuster = SchemaNameAdjuster.create(LOGGER); - private final Function valueTransformer; private final ConcurrentMap collections = new ConcurrentHashMap<>(); + private final JsonSerialization serialization = new JsonSerialization(); public MongoDbSchema(Filters filters, TopicSelector topicSelector, Schema sourceSchema) { this.filters = filters; this.topicSelector = topicSelector; this.sourceSchema = sourceSchema; - this.valueTransformer = resolveValueTransformer(); } @Override @@ -99,10 +77,10 @@ public DataCollectionSchema schemaFor(CollectionId collectionId) { id, fieldFilter, keySchema, - this::getDocumentId, + serialization::getDocumentId, envelope, valueSchema, - this::getDocumentValue); + serialization::getDocumentValue); }); } @@ -118,20 +96,4 @@ public void assureNonEmptySchema() { LOGGER.warn(DatabaseSchema.NO_CAPTURED_DATA_COLLECTIONS_WARNING); } } - - private String getDocumentId(Document document) { - if (document == null) { - return null; - } - return jsonSerializer.serialize(document.get(DBCollection.ID_FIELD_NAME)); - } - - private String getDocumentValue(Document document) { - return valueTransformer.apply(document); - } - - private static Function resolveValueTransformer() { - Encoder encoder = MongoClient.getDefaultCodecRegistry().get(Document.class); - return (doc) -> doc.toJson(COMPACT_JSON_SETTINGS, encoder); - } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldBlacklistIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldBlacklistIT.java index 4b583222f..49d706ab4 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldBlacklistIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldBlacklistIT.java @@ -5,7 +5,7 @@ */ package io.debezium.connector.mongodb; -import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS; +import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS; import static io.debezium.data.Envelope.FieldName.AFTER; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java index 1e61780e5..338e8addd 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java @@ -5,7 +5,7 @@ */ package io.debezium.connector.mongodb; -import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS; +import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS; import static io.debezium.data.Envelope.FieldName.AFTER; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java index b0ea43d58..f104ff27d 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java @@ -5,7 +5,7 @@ */ package io.debezium.connector.mongodb; -import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS; +import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS; import static io.debezium.data.Envelope.FieldName.AFTER; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.fail; diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index 039df01d0..018a619e8 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -5,7 +5,8 @@ */ package io.debezium.connector.mongodb; -import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS; +import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.fail; @@ -13,6 +14,8 @@ import java.io.InputStream; import java.math.BigDecimal; import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Calendar; import java.util.HashSet; @@ -45,8 +48,6 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.InsertOneOptions; -import com.mongodb.util.JSON; -import com.mongodb.util.JSONSerializers; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -333,8 +334,8 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO Testing.debug("Update event: " + updateRecord); Struct insertKey = (Struct) insertRecord.key(); Struct updateKey = (Struct) updateRecord.key(); - String insertId = JSON.parse(insertKey.getString("id")).toString(); - String updateId = JSON.parse(updateKey.getString("id")).toString(); + String insertId = toObjectId(insertKey.getString("id")).toString(); + String updateId = toObjectId(updateKey.getString("id")).toString(); assertThat(insertId).isEqualTo(id.get()); assertThat(updateId).isEqualTo(id.get()); @@ -364,7 +365,7 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO Testing.debug("Delete event: " + deleteRecord); Testing.debug("Tombstone event: " + tombStoneRecord); Struct deleteKey = (Struct) deleteRecord.key(); - String deleteId = JSON.parse(deleteKey.getString("id")).toString(); + String deleteId = toObjectId(deleteKey.getString("id")).toString(); assertThat(deleteId).isEqualTo(id.get()); } @@ -1348,7 +1349,7 @@ public void shouldGenerateRecordForInsertEvent() throws Exception { final Struct value = (Struct) deleteRecord.value(); assertThat(key.schema()).isSameAs(deleteRecord.keySchema()); - assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}"); + assertThat(key.get("id")).isEqualTo(formatObjectId(objId)); assertThat(value.schema()).isSameAs(deleteRecord.valueSchema()); // assertThat(value.getString(Envelope.FieldName.BEFORE)).isNull(); @@ -1402,7 +1403,7 @@ public void shouldGenerateRecordForUpdateEvent() throws Exception { final Struct value = (Struct) deleteRecord.value(); assertThat(key.schema()).isSameAs(deleteRecord.keySchema()); - assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId)); + assertThat(key.get("id")).isEqualTo(formatObjectId(objId)); Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH)); patchObj.remove("$v"); @@ -1455,7 +1456,7 @@ public void shouldGeneratorRecordForDeleteEvent() throws Exception { final Struct value = (Struct) deleteRecord.value(); assertThat(key.schema()).isSameAs(deleteRecord.keySchema()); - assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId)); + assertThat(key.get("id")).isEqualTo(formatObjectId(objId)); assertThat(value.schema()).isSameAs(deleteRecord.valueSchema()); assertThat(value.getString(Envelope.FieldName.AFTER)).isNull(); @@ -1470,7 +1471,7 @@ public void shouldGeneratorRecordForDeleteEvent() throws Exception { final SourceRecord tombstoneRecord = records.allRecordsInOrder().get(1); final Struct tombstoneKey = (Struct) tombstoneRecord.key(); assertThat(tombstoneKey.schema()).isSameAs(tombstoneRecord.keySchema()); - assertThat(tombstoneKey.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId)); + assertThat(tombstoneKey.get("id")).isEqualTo(formatObjectId(objId)); assertThat(tombstoneRecord.value()).isNull(); assertThat(tombstoneRecord.valueSchema()).isNull(); } @@ -1514,7 +1515,7 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exceptio final Struct value = (Struct) record.value(); assertThat(key.schema()).isSameAs(record.keySchema()); - assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId)); + assertThat(key.get("id")).isEqualTo(formatObjectId(objId)); assertThat(value.schema()).isSameAs(record.valueSchema()); assertThat(value.getString(Envelope.FieldName.AFTER)).isNull(); @@ -1581,13 +1582,14 @@ public void shouldGenerateRecordsWithCorrectlySerializedId() throws Exception { assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(0), "id", "2147483657"); assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(1), "id", "\"123\""); - assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(2), "id", "{ \"company\" : 32 , \"dept\" : \"home improvement\"}"); + assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(2), "id", "{\"company\": 32,\"dept\": \"home improvement\"}"); // that's actually not what https://docs.mongodb.com/manual/reference/mongodb-extended-json/#date suggests; // seems JsonSerializers is not fully compliant with that description - assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(3), "id", "{ \"$date\" : " + cal.getTime().getTime() + "}"); + assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(3), "id", + "{\"$date\": \"" + ZonedDateTime.ofInstant(Instant.ofEpochMilli(cal.getTimeInMillis()), ZoneId.of("Z")).format(ISO_OFFSET_DATE_TIME) + "\"}"); if (decimal128Supported) { - assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(4), "id", "{ \"$numberDecimal\" : \"123.45678\"}"); + assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(4), "id", "{\"$numberDecimal\": \"123.45678\"}"); } } @@ -1627,7 +1629,7 @@ public void shouldSupportDbRef2() throws Exception { final Struct key = (Struct) record.key(); final Struct value = (Struct) record.value(); assertThat(key.schema()).isSameAs(record.keySchema()); - assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}"); + assertThat(key.get("id")).isEqualTo(formatObjectId(objId)); assertThat(value.schema()).isSameAs(record.valueSchema()); @@ -1758,7 +1760,7 @@ public void shouldReplicateContent() throws Exception { VerifyRecord.isValid(record); final Struct key = (Struct) record.key(); - final ObjectId id = (ObjectId) (JSON.parse(key.getString("id"))); + final ObjectId id = toObjectId(key.getString("id")); foundIds.add(id); if (record.value() != null) { final Struct value = (Struct) record.value(); @@ -1896,7 +1898,7 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E final Struct value = (Struct) deleteRecord.value(); assertThat(key.schema()).isSameAs(deleteRecord.keySchema()); - assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId)); + assertThat(key.get("id")).isEqualTo(formatObjectId(objId)); Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH)); patchObj.remove("$v"); @@ -1908,6 +1910,10 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E assertThat(value.getInt64(Envelope.FieldName.TIMESTAMP)).isGreaterThanOrEqualTo(timestamp.toEpochMilli()); } + private String formatObjectId(ObjectId objId) { + return "{\"$oid\": \"" + objId + "\"}"; + } + private void insertDocuments(String dbName, String collectionName, Document... documents) { primary().execute("store documents", mongo -> { Testing.debug("Storing in '" + dbName + "." + collectionName + "' document"); @@ -1939,4 +1945,8 @@ private void deleteDocument(String dbName, String collectionName, ObjectId objec coll.deleteOne(filter); }); } + + private ObjectId toObjectId(String oid) { + return new ObjectId(oid.substring(10, oid.length() - 2)); + } }