diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index c97d77e9a..2ccd23615 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -774,6 +774,7 @@ public JdbcConnection prepareUpdate(String stmt, StatementPreparer preparer) thr if (preparer != null) { preparer.accept(statement); } + LOGGER.trace("Executing statement '{}'", stmt); statement.execute(); return this; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java index 002dbdb4a..e109f8a70 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java @@ -78,8 +78,8 @@ public AbstractIncrementalSnapshotContext(boolean useCatalogBeforeSchema) { } public boolean openWindow(String id) { - if (!id.startsWith(currentChunkId)) { - LOGGER.info("Arrived request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId); + if (notExpectedChunk(id)) { + LOGGER.info("Received request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId); return false; } LOGGER.debug("Opening window for incremental snapshot chunk"); @@ -88,8 +88,8 @@ public boolean openWindow(String id) { } public boolean closeWindow(String id) { - if (!id.startsWith(currentChunkId)) { - LOGGER.info("Arrived request to close window with id = '{}', expected = '{}', request ignored", id, currentChunkId); + if (notExpectedChunk(id)) { + LOGGER.info("Received request to close window with id = '{}', expected = '{}', request ignored", id, currentChunkId); return false; } LOGGER.debug("Closing window for incremental snapshot chunk"); @@ -97,6 +97,17 @@ public boolean closeWindow(String id) { return true; } + /** + * The snapshotting process can receive out-of-order windowing signals after connector restart + * as depending on committed offset position some signals can be replayed. + * In extreme case a signal can be received even when the incremental snapshot was completed just + * before the restart. + * Such windowing signals are ignored. + */ + private boolean notExpectedChunk(String id) { + return currentChunkId == null || !id.startsWith(currentChunkId); + } + public boolean deduplicationNeeded() { return windowOpened; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java index cad818757..ef27ffcfd 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java @@ -54,6 +54,7 @@ public void processMessage(Partition partition, DataCollectionId dataCollectionI @Override protected void emitWindowOpen() throws SQLException { jdbcConnection.prepareUpdate(signalWindowStatement, x -> { + LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId()); x.setString(1, context.currentChunkId() + "-open"); x.setString(2, OpenIncrementalSnapshotWindow.NAME); }); @@ -63,6 +64,7 @@ protected void emitWindowOpen() throws SQLException { @Override protected void emitWindowClose() throws SQLException { jdbcConnection.prepareUpdate(signalWindowStatement, x -> { + LOGGER.trace("Emitting close window for chunk = '{}'", context.currentChunkId()); x.setString(1, context.currentChunkId() + "-close"); x.setString(2, CloseIncrementalSnapshotWindow.NAME); });