DBZ-1322 Removed dependency on deprecated MongoDB driver code

This commit is contained in:
Jiri Pechanec 2020-11-26 09:51:42 +01:00 committed by Gunnar Morling
parent b7fede7d53
commit ea019c3b2c
6 changed files with 112 additions and 61 deletions

View File

@ -0,0 +1,79 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.function.Function;
import org.bson.Document;
import org.bson.codecs.Encoder;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
/**
* A class responsible for serialization of message keys and values to MongoDB compatible JSON
*
* @author Jiri Pechanec
*
*/
class JsonSerialization {
@FunctionalInterface
public static interface Transformer extends Function<Document, String> {
}
/**
* Common settings for writing JSON strings using a compact JSON format
*/
public static final JsonWriterSettings COMPACT_JSON_SETTINGS = JsonWriterSettings.builder()
.outputMode(JsonMode.STRICT)
.indent(true)
.indentCharacters("")
.newLineCharacters("")
.build();
/**
* Common settings for writing JSON strings using a compact JSON format
*/
private static final JsonWriterSettings SIMPLE_JSON_SETTINGS = JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.indent(true)
.indentCharacters("")
.newLineCharacters("")
.build();
private final Transformer transformer;
public JsonSerialization() {
final Encoder<Document> encoder = MongoClient.getDefaultCodecRegistry().get(Document.class);
transformer = (doc) -> doc.toJson(COMPACT_JSON_SETTINGS, encoder);
}
public String getDocumentId(Document document) {
if (document == null) {
return null;
}
// The serialized value is in format {"_": xxx} so we need to remove the starting dummy field name and closing brace
final String keyValue = new BasicDBObject("_", document.get(DBCollection.ID_FIELD_NAME)).toJson(SIMPLE_JSON_SETTINGS);
final int start = 6;
final int end = keyValue.length() - 1;
if (!(end > start)) {
throw new IllegalStateException("Serialized JSON object '" + keyValue + "' is not in expected format");
}
return keyValue.substring(start, end);
}
public String getDocumentValue(Document document) {
return transformer.apply(document);
}
public Transformer getTransformer() {
return transformer;
}
}

View File

@ -7,22 +7,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.bson.Document;
import org.bson.codecs.Encoder;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.util.JSONSerializers;
import com.mongodb.util.ObjectSerializer;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.mongodb.FieldSelector.FieldFilter;
import io.debezium.data.Envelope;
@ -40,31 +30,19 @@
@ThreadSafe
public class MongoDbSchema implements DatabaseSchema<CollectionId> {
/**
* Common settings for writing JSON strings using a compact JSON format
*/
public static final JsonWriterSettings COMPACT_JSON_SETTINGS = JsonWriterSettings.builder()
.outputMode(JsonMode.STRICT)
.indent(true)
.indentCharacters("")
.newLineCharacters("")
.build();
private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict();
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSchema.class);
private final Filters filters;
private final TopicSelector<CollectionId> topicSelector;
private final Schema sourceSchema;
private final SchemaNameAdjuster adjuster = SchemaNameAdjuster.create(LOGGER);
private final Function<Document, String> valueTransformer;
private final ConcurrentMap<CollectionId, MongoDbCollectionSchema> collections = new ConcurrentHashMap<>();
private final JsonSerialization serialization = new JsonSerialization();
public MongoDbSchema(Filters filters, TopicSelector<CollectionId> topicSelector, Schema sourceSchema) {
this.filters = filters;
this.topicSelector = topicSelector;
this.sourceSchema = sourceSchema;
this.valueTransformer = resolveValueTransformer();
}
@Override
@ -99,10 +77,10 @@ public DataCollectionSchema schemaFor(CollectionId collectionId) {
id,
fieldFilter,
keySchema,
this::getDocumentId,
serialization::getDocumentId,
envelope,
valueSchema,
this::getDocumentValue);
serialization::getDocumentValue);
});
}
@ -118,20 +96,4 @@ public void assureNonEmptySchema() {
LOGGER.warn(DatabaseSchema.NO_CAPTURED_DATA_COLLECTIONS_WARNING);
}
}
private String getDocumentId(Document document) {
if (document == null) {
return null;
}
return jsonSerializer.serialize(document.get(DBCollection.ID_FIELD_NAME));
}
private String getDocumentValue(Document document) {
return valueTransformer.apply(document);
}
private static Function<Document, String> resolveValueTransformer() {
Encoder<Document> encoder = MongoClient.getDefaultCodecRegistry().get(Document.class);
return (doc) -> doc.toJson(COMPACT_JSON_SETTINGS, encoder);
}
}

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS;
import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS;
import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS;
import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;

View File

@ -5,7 +5,8 @@
*/
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS;
import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -13,6 +14,8 @@
import java.io.InputStream;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
@ -45,8 +48,6 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneOptions;
import com.mongodb.util.JSON;
import com.mongodb.util.JSONSerializers;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
@ -333,8 +334,8 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO
Testing.debug("Update event: " + updateRecord);
Struct insertKey = (Struct) insertRecord.key();
Struct updateKey = (Struct) updateRecord.key();
String insertId = JSON.parse(insertKey.getString("id")).toString();
String updateId = JSON.parse(updateKey.getString("id")).toString();
String insertId = toObjectId(insertKey.getString("id")).toString();
String updateId = toObjectId(updateKey.getString("id")).toString();
assertThat(insertId).isEqualTo(id.get());
assertThat(updateId).isEqualTo(id.get());
@ -364,7 +365,7 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO
Testing.debug("Delete event: " + deleteRecord);
Testing.debug("Tombstone event: " + tombStoneRecord);
Struct deleteKey = (Struct) deleteRecord.key();
String deleteId = JSON.parse(deleteKey.getString("id")).toString();
String deleteId = toObjectId(deleteKey.getString("id")).toString();
assertThat(deleteId).isEqualTo(id.get());
}
@ -1348,7 +1349,7 @@ public void shouldGenerateRecordForInsertEvent() throws Exception {
final Struct value = (Struct) deleteRecord.value();
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}");
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
// assertThat(value.getString(Envelope.FieldName.BEFORE)).isNull();
@ -1402,7 +1403,7 @@ public void shouldGenerateRecordForUpdateEvent() throws Exception {
final Struct value = (Struct) deleteRecord.value();
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH));
patchObj.remove("$v");
@ -1455,7 +1456,7 @@ public void shouldGeneratorRecordForDeleteEvent() throws Exception {
final Struct value = (Struct) deleteRecord.value();
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
assertThat(value.getString(Envelope.FieldName.AFTER)).isNull();
@ -1470,7 +1471,7 @@ public void shouldGeneratorRecordForDeleteEvent() throws Exception {
final SourceRecord tombstoneRecord = records.allRecordsInOrder().get(1);
final Struct tombstoneKey = (Struct) tombstoneRecord.key();
assertThat(tombstoneKey.schema()).isSameAs(tombstoneRecord.keySchema());
assertThat(tombstoneKey.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
assertThat(tombstoneKey.get("id")).isEqualTo(formatObjectId(objId));
assertThat(tombstoneRecord.value()).isNull();
assertThat(tombstoneRecord.valueSchema()).isNull();
}
@ -1514,7 +1515,7 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exceptio
final Struct value = (Struct) record.value();
assertThat(key.schema()).isSameAs(record.keySchema());
assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
assertThat(value.schema()).isSameAs(record.valueSchema());
assertThat(value.getString(Envelope.FieldName.AFTER)).isNull();
@ -1581,13 +1582,14 @@ public void shouldGenerateRecordsWithCorrectlySerializedId() throws Exception {
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(0), "id", "2147483657");
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(1), "id", "\"123\"");
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(2), "id", "{ \"company\" : 32 , \"dept\" : \"home improvement\"}");
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(2), "id", "{\"company\": 32,\"dept\": \"home improvement\"}");
// that's actually not what https://docs.mongodb.com/manual/reference/mongodb-extended-json/#date suggests;
// seems JsonSerializers is not fully compliant with that description
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(3), "id", "{ \"$date\" : " + cal.getTime().getTime() + "}");
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(3), "id",
"{\"$date\": \"" + ZonedDateTime.ofInstant(Instant.ofEpochMilli(cal.getTimeInMillis()), ZoneId.of("Z")).format(ISO_OFFSET_DATE_TIME) + "\"}");
if (decimal128Supported) {
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(4), "id", "{ \"$numberDecimal\" : \"123.45678\"}");
assertSourceRecordKeyFieldIsEqualTo(sourceRecords.get(4), "id", "{\"$numberDecimal\": \"123.45678\"}");
}
}
@ -1627,7 +1629,7 @@ public void shouldSupportDbRef2() throws Exception {
final Struct key = (Struct) record.key();
final Struct value = (Struct) record.value();
assertThat(key.schema()).isSameAs(record.keySchema());
assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}");
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
assertThat(value.schema()).isSameAs(record.valueSchema());
@ -1758,7 +1760,7 @@ public void shouldReplicateContent() throws Exception {
VerifyRecord.isValid(record);
final Struct key = (Struct) record.key();
final ObjectId id = (ObjectId) (JSON.parse(key.getString("id")));
final ObjectId id = toObjectId(key.getString("id"));
foundIds.add(id);
if (record.value() != null) {
final Struct value = (Struct) record.value();
@ -1896,7 +1898,7 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E
final Struct value = (Struct) deleteRecord.value();
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH));
patchObj.remove("$v");
@ -1908,6 +1910,10 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E
assertThat(value.getInt64(Envelope.FieldName.TIMESTAMP)).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
}
private String formatObjectId(ObjectId objId) {
return "{\"$oid\": \"" + objId + "\"}";
}
private void insertDocuments(String dbName, String collectionName, Document... documents) {
primary().execute("store documents", mongo -> {
Testing.debug("Storing in '" + dbName + "." + collectionName + "' document");
@ -1939,4 +1945,8 @@ private void deleteDocument(String dbName, String collectionName, ObjectId objec
coll.deleteOne(filter);
});
}
private ObjectId toObjectId(String oid) {
return new ObjectId(oid.substring(10, oid.length() - 2));
}
}