diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 66319f640..15c2af574 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -226,6 +226,11 @@ public String toString() { } public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock) { + return initialContext(connectorConfig, jdbcConnection, clock, null, null); + } + + public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Lsn lastCommitLsn, + Lsn lastCompletelyProcessedLsn) { try { LOGGER.info("Creating initial offset context"); final Lsn lsn = Lsn.valueOf(jdbcConnection.currentXLogLocation()); @@ -234,8 +239,8 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne return new PostgresOffsetContext( connectorConfig, lsn, - null, - null, + lastCompletelyProcessedLsn, + lastCommitLsn, txId, clock.currentTimeAsInstant(), false, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index 2df89e28a..104920df4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -44,6 +44,7 @@ public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeE private final Snapshotter snapshotter; private final SlotCreationResult slotCreatedInfo; private final SlotState startingSlotInfo; + private final PostgresOffsetContext previousOffset; public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresOffsetContext previousOffset, PostgresConnection jdbcConnection, PostgresSchema schema, EventDispatcher dispatcher, Clock clock, @@ -55,6 +56,7 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig this.snapshotter = snapshotter; this.slotCreatedInfo = slotCreatedInfo; this.startingSlotInfo = startingSlotInfo; + this.previousOffset = previousOffset; } @Override @@ -130,7 +132,17 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotCont protected void determineSnapshotOffset(RelationalSnapshotContext ctx) throws Exception { PostgresOffsetContext offset = (PostgresOffsetContext) ctx.offset; if (offset == null) { - offset = PostgresOffsetContext.initialContext(connectorConfig, jdbcConnection, getClock()); + if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot()) { + // The connect framework, not the connector, manages triggering committing offset state so the + // replication stream may not have flushed the latest offset state during catch up streaming. + // The previousOffset variable is shared between the catch up streaming and snapshot phases and + // has the latest known offset state. + offset = PostgresOffsetContext.initialContext(connectorConfig, jdbcConnection, getClock(), + previousOffset.lastCommitLsn(), previousOffset.lastCompletelyProcessedLsn()); + } + else { + offset = PostgresOffsetContext.initialContext(connectorConfig, jdbcConnection, getClock()); + } ctx.offset = offset; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 6f03f5aaa..dc1bf7881 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1490,6 +1490,8 @@ public void customSnapshotterSkipsTablesOnRestart() throws Exception { VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 1); VerifyRecord.isValidRead(s1recs.get(2), PK_FIELD, 2); + assertNoRecordsToConsume(); + TestHelper.assertNoOpenTransactions(); stopConnector(value -> assertThat(logInterceptor.containsMessage("For table 's2.a' the select statement was not provided, skipping table")).isTrue());