DBZ-7917 Disable ReadOnlyIncrementalSnapshotIT for PostgreSQL < 13

This commit is contained in:
mfvitale 2024-06-12 12:35:52 +02:00 committed by Jiri Pechanec
parent 05a04451ec
commit 8edb87a881
3 changed files with 11 additions and 1 deletions

View File

@ -223,7 +223,7 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition
lsnFlushingAllowed = true;
}
else {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
dispatcher.dispatchHeartbeatEventAlsoToIncrementalSnapshot(partition, offsetContext);
noMessageIterations++;
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
noMessageIterations = 0;

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.postgresql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
@ -21,6 +22,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot;
@ -29,6 +31,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Strings;
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 13, minor = 0, reason = "Function pg_current_snapshot() not supported until PostgreSQL 13")
public class ReadOnlyIncrementalSnapshotIT extends IncrementalSnapshotIT {
private final LogInterceptor executeSignalInterceptor = new LogInterceptor(ExecuteSnapshot.class);

View File

@ -424,6 +424,13 @@ public void dispatchHeartbeatEvent(P partition, OffsetContext offset) throws Int
partition.getSourcePartition(),
offset.getOffset(),
this::enqueueHeartbeat);
}
public void dispatchHeartbeatEventAlsoToIncrementalSnapshot(P partition, OffsetContext offset) throws InterruptedException {
heartbeat.heartbeat(
partition.getSourcePartition(),
offset.getOffset(),
this::enqueueHeartbeat);
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processHeartbeat(partition, offset);