diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index c90c21355..e50c1f449 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -81,7 +81,7 @@ private void setSnapshotStartLsn(PostgresSnapshotChangeEventSource snapshotSourc PostgresOffsetContext offsetContext) throws SQLException { snapshotSource.createSnapshotConnection(); - snapshotSource.setSnapshotTransactionIsolationLevel(); + snapshotSource.setSnapshotTransactionIsolationLevel(false); snapshotSource.updateOffsetForPreSnapshotCatchUpStreaming(offsetContext); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index eb6ed8117..f6d15af03 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -98,7 +98,7 @@ protected void connectionCreated(RelationalSnapshotContext getSnapshotSelect(RelationalSnapshotContext snapshotTableLockingStatement(Duration lockTimeout, Set< } @Override - public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { - if (newSlotInfo != null) { + public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { + if (newSlotInfo != null && !isOnDemand) { + /* + * For an on demand blocking snapshot we don't need to reuse + * the same snapshot from the existing exported transaction as for the initial snapshot. + */ String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; } - return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo); + return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo, isOnDemand); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java index 42ede7444..75aa638eb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java @@ -72,7 +72,7 @@ default boolean shouldStreamEventsStartingFromSnapshot() { * @param newSlotInfo if a new slot was created for snapshotting, this contains information from * the `create_replication_slot` command */ - default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { + default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { // we're using the same isolation level that pg_backup uses return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java index fee9a77b7..7f5d129a0 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java @@ -68,7 +68,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s return TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) @@ -148,4 +148,35 @@ public void executeBlockingSnapshotWhenSnapshotModeIsNever() throws Exception { assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10)); } + + @FixFor("DBZ-7312") + @Test + public void executeBlockingSnapshotWhenASnapshotAlreadyExecuted() throws Exception { + // Testing.Print.enable(); + + // Avoid to start the streaming from data inserted before the connector start + TestHelper.dropDefaultReplicationSlot(); + TestHelper.dropPublication(); + + populateTable(); + + startConnectorWithSnapshot(x -> mutableConfig(true, true) + .with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "not exist") + .with(PostgresConnectorConfig.SLOT_NAME, "snapshot_mode_initial_crash4") + + ); + + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId()); + + waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); + + int signalingRecords = 1; + + assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT + signalingRecords, 10)); + + insertRecords(ROW_COUNT, ROW_COUNT * 2); + + assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10)); + + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java index 9bbb3c035..f5ea83d4a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java @@ -54,7 +54,7 @@ public Optional buildSnapshotQuery(TableId tableId, List snapsho } @Override - public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { + public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { if (newSlotInfo != null) { String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;