DBZ-3901 Guard against chunk signal coming when none expected

This commit is contained in:
Jiri Pechanec 2021-08-24 09:00:24 +02:00
parent 50c156f061
commit 4396513795
3 changed files with 18 additions and 4 deletions

View File

@ -774,6 +774,7 @@ public JdbcConnection prepareUpdate(String stmt, StatementPreparer preparer) thr
if (preparer != null) {
preparer.accept(statement);
}
LOGGER.trace("Executing statement '{}'", stmt);
statement.execute();
return this;
}

View File

@ -78,8 +78,8 @@ public AbstractIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
}
public boolean openWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
if (notExpectedChunk(id)) {
LOGGER.info("Received request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
return false;
}
LOGGER.debug("Opening window for incremental snapshot chunk");
@ -88,8 +88,8 @@ public boolean openWindow(String id) {
}
public boolean closeWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to close window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
if (notExpectedChunk(id)) {
LOGGER.info("Received request to close window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
return false;
}
LOGGER.debug("Closing window for incremental snapshot chunk");
@ -97,6 +97,17 @@ public boolean closeWindow(String id) {
return true;
}
/**
* The snapshotting process can receive out-of-order windowing signals after connector restart
* as depending on committed offset position some signals can be replayed.
* In extreme case a signal can be received even when the incremental snapshot was completed just
* before the restart.
* Such windowing signals are ignored.
*/
private boolean notExpectedChunk(String id) {
return currentChunkId == null || !id.startsWith(currentChunkId);
}
public boolean deduplicationNeeded() {
return windowOpened;
}

View File

@ -54,6 +54,7 @@ public void processMessage(Partition partition, DataCollectionId dataCollectionI
@Override
protected void emitWindowOpen() throws SQLException {
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId());
x.setString(1, context.currentChunkId() + "-open");
x.setString(2, OpenIncrementalSnapshotWindow.NAME);
});
@ -63,6 +64,7 @@ protected void emitWindowOpen() throws SQLException {
@Override
protected void emitWindowClose() throws SQLException {
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
LOGGER.trace("Emitting close window for chunk = '{}'", context.currentChunkId());
x.setString(1, context.currentChunkId() + "-close");
x.setString(2, CloseIncrementalSnapshotWindow.NAME);
});