From 9e5661b573fbb09e5ccd8ef900ad0acfc99ac57e Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 9 Dec 2021 08:40:30 +0100 Subject: [PATCH] DBZ-3342 Addressed review comments --- .../MongoDbIncrementalSnapshotChangeEventSource.java | 12 ++++++++++-- .../mongodb/MongoDbIncrementalSnapshotContext.java | 4 +--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index 69a5bf127..4db7fc384 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -157,7 +157,11 @@ protected void emitWindowOpen() throws InterruptedException { final MongoCollection collection = database.getCollection(collectionId.name()); LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId()); - collection.insertOne(Document.parse("{\"_id\": \"" + id + "\", \"type\": \"" + OpenIncrementalSnapshotWindow.NAME + "\", \"payload\": \"\"}")); + final Document signal = new Document(); + signal.put(DOCUMENT_ID, id); + signal.put("type", OpenIncrementalSnapshotWindow.NAME); + signal.put("payload", ""); + collection.insertOne(signal); }); } @@ -174,7 +178,11 @@ protected void emitWindowClose() throws InterruptedException { final MongoCollection collection = database.getCollection(collectionId.name()); LOGGER.trace("Emitting close window for chunk = '{}'", context.currentChunkId()); - collection.insertOne(Document.parse("{\"_id\": \"" + id + "\", \"type\": \"" + CloseIncrementalSnapshotWindow.NAME + "\", \"payload\": \"\"}")); + final Document signal = new Document(); + signal.put(DOCUMENT_ID, id); + signal.put("type", CloseIncrementalSnapshotWindow.NAME); + signal.put("payload", ""); + collection.insertOne(signal); }); } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java index 99ed37405..67ab6e5fe 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java @@ -29,7 +29,7 @@ import io.debezium.util.HexConverter; /** - * A class describing current state of incremental snapshot of MongoDB connector + * Describes current state of incremental snapshot of MongoDB connector * * @author Jiri Pechanec * @@ -60,7 +60,6 @@ public class MongoDbIncrementalSnapshotContext implements IncrementalSnapshot // State to be stored and recovered from offsets private final Queue dataCollectionsToSnapshot = new LinkedList<>(); - private final boolean useCatalogBeforeSchema; /** * The PK of the last record that was passed to Kafka Connect. In case of * connector restart the start of the first chunk will be populated from it. @@ -79,7 +78,6 @@ public class MongoDbIncrementalSnapshotContext implements IncrementalSnapshot private boolean schemaVerificationPassed; public MongoDbIncrementalSnapshotContext(boolean useCatalogBeforeSchema) { - this.useCatalogBeforeSchema = useCatalogBeforeSchema; } public boolean openWindow(String id) {