DBZ-7917 Manage case when high watermark is not yet retrieved

This commit is contained in:
mfvitale 2024-06-11 12:12:34 +02:00 committed by Jiri Pechanec
parent 7225e93722
commit 9339998c36
2 changed files with 6 additions and 2 deletions

View File

@ -137,6 +137,7 @@ private void readUntilNewTransactionChange(P partition, OffsetContext offsetCont
LOGGER.debug("Event txId {}, snapshot is running {}, reachedHighWatermark {}",
eventTxId, getContext().snapshotRunning(), getContext().isTransactionVisible(eventTxId));
LOGGER.trace("Incremental snapshot context {}", getContext());
if (getContext().snapshotRunning() && maxInProgressTransactionCommitted(eventTxId)) {
getContext().closeWindow();
sendWindowEvents(partition, offsetContext);
@ -146,8 +147,10 @@ private void readUntilNewTransactionChange(P partition, OffsetContext offsetCont
}
while (getContext().snapshotRunning() && getContext().isTransactionVisible(eventTxId)) {
LOGGER.debug("Finishing snapshot, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(),
getContext().isTransactionVisible(eventTxId));
getContext().closeWindow();
sendWindowEvents(partition, offsetContext);
readChunk(partition, offsetContext);
@ -155,6 +158,7 @@ private void readUntilNewTransactionChange(P partition, OffsetContext offsetCont
LOGGER.trace("Watermarks changed");
return;
}
LOGGER.trace("Re read chunk finished, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(),
getContext().isTransactionVisible(eventTxId));
}
@ -182,7 +186,7 @@ private void getCurrentSnapshot(Consumer<PgSnapshot> watermark) {
private boolean maxInProgressTransactionCommitted(Long eventTxId) {
if (getContext().getHighWatermark() == null) {
return true;
return false;
}
return getContext().getHighWatermark().getXMax().equals(eventTxId);

View File

@ -96,7 +96,7 @@ public void closeWindow() {
public boolean isTransactionVisible(Long eventTxId) {
if (highWatermark == null) {
return true;
return false;
}
return eventTxId.compareTo(highWatermark.getXMin()) <= 0;
}