DBZ-1052 Flush LSN for skipped tx messages

This commit is contained in:
Jiri Pechanec 2020-01-31 14:07:13 +01:00 committed by Gunnar Morling
parent 9dbea89f0a
commit 010e0c6fa6

View File

@ -121,14 +121,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final Long lsn = stream.lastReceivedLsn();
if (message == null) {
LOGGER.trace("Received empty message");
lastCompletelyProcessedLsn = lsn;
offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, null, null, null, taskContext.getSlotXmin(connection));
maybeWarnAboutGrowingWalBacklog(false);
dispatcher.dispatchHeartbeatEvent(offsetContext);
skipMessage(lsn);
return;
}
if (message.isTransactionalMessage() && !connectorConfig.shouldProvideTransactionMetadata()) {
LOGGER.trace("Received transactional message {}", message);
skipMessage(lsn);
return;
}
if (message.isLastEventForLsn()) {
@ -205,6 +203,13 @@ else if (message.getOperation() == Operation.COMMIT) {
}
}
private void skipMessage(final Long lsn) throws SQLException, InterruptedException {
lastCompletelyProcessedLsn = lsn;
offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, null, null, null, taskContext.getSlotXmin(connection));
maybeWarnAboutGrowingWalBacklog(false);
dispatcher.dispatchHeartbeatEvent(offsetContext);
}
/**
* If we receive change events but all of them get filtered out, we cannot
* commit any new offset with Apache Kafka. This in turn means no LSN is ever