DBZ-7312 On demand blocking snapshot will not set transaction snapshot

This commit is contained in:
mfvitale 2024-01-11 16:29:45 +01:00 committed by Jiri Pechanec
parent b905016071
commit 5d088cffca
6 changed files with 45 additions and 10 deletions

View File

@ -81,7 +81,7 @@ private void setSnapshotStartLsn(PostgresSnapshotChangeEventSource snapshotSourc
PostgresOffsetContext offsetContext)
throws SQLException {
snapshotSource.createSnapshotConnection();
snapshotSource.setSnapshotTransactionIsolationLevel();
snapshotSource.setSnapshotTransactionIsolationLevel(false);
snapshotSource.updateOffsetForPreSnapshotCatchUpStreaming(offsetContext);
}

View File

@ -98,7 +98,7 @@ protected void connectionCreated(RelationalSnapshotContext<PostgresPartition, Po
// level on the transaction used in catch up streaming has already set the isolation level and executed
// statements, the transaction does not need to get set the level again here.
if (snapshotter.shouldStreamEventsStartingFromSnapshot() && startingSlotInfo == null) {
setSnapshotTransactionIsolationLevel();
setSnapshotTransactionIsolationLevel(snapshotContext.onDemand);
}
schema.refresh(jdbcConnection, false);
}
@ -249,9 +249,9 @@ protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<PostgresP
return snapshotter.buildSnapshotQuery(tableId, columns);
}
protected void setSnapshotTransactionIsolationLevel() throws SQLException {
protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws SQLException {
LOGGER.info("Setting isolation level");
String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo);
String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand);
LOGGER.info("Opening transaction with statement {}", transactionStatement);
jdbcConnection.executeWithoutCommitting(transactionStatement);
}

View File

@ -38,11 +38,15 @@ public Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<
}
@Override
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) {
if (newSlotInfo != null) {
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
if (newSlotInfo != null && !isOnDemand) {
/*
* For an on demand blocking snapshot we don't need to reuse
* the same snapshot from the existing exported transaction as for the initial snapshot.
*/
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
}
return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo);
return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo, isOnDemand);
}
}

View File

@ -72,7 +72,7 @@ default boolean shouldStreamEventsStartingFromSnapshot() {
* @param newSlotInfo if a new slot was created for snapshotting, this contains information from
* the `create_replication_slot` command
*/
default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) {
default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
// we're using the same isolation level that pg_backup uses
return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
}

View File

@ -68,7 +68,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
return TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal")
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
@ -148,4 +148,35 @@ public void executeBlockingSnapshotWhenSnapshotModeIsNever() throws Exception {
assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10));
}
@FixFor("DBZ-7312")
@Test
public void executeBlockingSnapshotWhenASnapshotAlreadyExecuted() throws Exception {
// Testing.Print.enable();
// Avoid to start the streaming from data inserted before the connector start
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
populateTable();
startConnectorWithSnapshot(x -> mutableConfig(true, true)
.with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "not exist")
.with(PostgresConnectorConfig.SLOT_NAME, "snapshot_mode_initial_crash4")
);
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
int signalingRecords = 1;
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT + signalingRecords, 10));
insertRecords(ROW_COUNT, ROW_COUNT * 2);
assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10));
}
}

View File

@ -54,7 +54,7 @@ public Optional<String> buildSnapshotQuery(TableId tableId, List<String> snapsho
}
@Override
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) {
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
if (newSlotInfo != null) {
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;