DBZ-3342 Addressed review comments

This commit is contained in:
Jiri Pechanec 2021-12-09 08:40:30 +01:00 committed by Gunnar Morling
parent 632f228581
commit 9e5661b573
2 changed files with 11 additions and 5 deletions

View File

@ -157,7 +157,11 @@ protected void emitWindowOpen() throws InterruptedException {
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId());
collection.insertOne(Document.parse("{\"_id\": \"" + id + "\", \"type\": \"" + OpenIncrementalSnapshotWindow.NAME + "\", \"payload\": \"\"}"));
final Document signal = new Document();
signal.put(DOCUMENT_ID, id);
signal.put("type", OpenIncrementalSnapshotWindow.NAME);
signal.put("payload", "");
collection.insertOne(signal);
});
}
@ -174,7 +178,11 @@ protected void emitWindowClose() throws InterruptedException {
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
LOGGER.trace("Emitting close window for chunk = '{}'", context.currentChunkId());
collection.insertOne(Document.parse("{\"_id\": \"" + id + "\", \"type\": \"" + CloseIncrementalSnapshotWindow.NAME + "\", \"payload\": \"\"}"));
final Document signal = new Document();
signal.put(DOCUMENT_ID, id);
signal.put("type", CloseIncrementalSnapshotWindow.NAME);
signal.put("payload", "");
collection.insertOne(signal);
});
}

View File

@ -29,7 +29,7 @@
import io.debezium.util.HexConverter;
/**
* A class describing current state of incremental snapshot of MongoDB connector
* Describes current state of incremental snapshot of MongoDB connector
*
* @author Jiri Pechanec
*
@ -60,7 +60,6 @@ public class MongoDbIncrementalSnapshotContext<T> implements IncrementalSnapshot
// State to be stored and recovered from offsets
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
private final boolean useCatalogBeforeSchema;
/**
* The PK of the last record that was passed to Kafka Connect. In case of
* connector restart the start of the first chunk will be populated from it.
@ -79,7 +78,6 @@ public class MongoDbIncrementalSnapshotContext<T> implements IncrementalSnapshot
private boolean schemaVerificationPassed;
public MongoDbIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
}
public boolean openWindow(String id) {