diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java index eb8a890dd..61cdaa872 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.java @@ -137,6 +137,7 @@ private void readUntilNewTransactionChange(P partition, OffsetContext offsetCont LOGGER.debug("Event txId {}, snapshot is running {}, reachedHighWatermark {}", eventTxId, getContext().snapshotRunning(), getContext().isTransactionVisible(eventTxId)); + LOGGER.trace("Incremental snapshot context {}", getContext()); if (getContext().snapshotRunning() && maxInProgressTransactionCommitted(eventTxId)) { getContext().closeWindow(); sendWindowEvents(partition, offsetContext); @@ -146,8 +147,10 @@ private void readUntilNewTransactionChange(P partition, OffsetContext offsetCont } while (getContext().snapshotRunning() && getContext().isTransactionVisible(eventTxId)) { + LOGGER.debug("Finishing snapshot, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(), getContext().isTransactionVisible(eventTxId)); + getContext().closeWindow(); sendWindowEvents(partition, offsetContext); readChunk(partition, offsetContext); @@ -155,6 +158,7 @@ private void readUntilNewTransactionChange(P partition, OffsetContext offsetCont LOGGER.trace("Watermarks changed"); return; } + LOGGER.trace("Re read chunk finished, snapshot is running {}, reachedHighWatermark {}", getContext().snapshotRunning(), getContext().isTransactionVisible(eventTxId)); } @@ -182,7 +186,7 @@ private void getCurrentSnapshot(Consumer watermark) { private boolean maxInProgressTransactionCommitted(Long eventTxId) { if (getContext().getHighWatermark() == null) { - return true; + return false; } return getContext().getHighWatermark().getXMax().equals(eventTxId); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java index fd5be1cb4..038e3d89e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotContext.java @@ -96,7 +96,7 @@ public void closeWindow() { public boolean isTransactionVisible(Long eventTxId) { if (highWatermark == null) { - return true; + return false; } return eventTxId.compareTo(highWatermark.getXMin()) <= 0; }