DBZ-7917 Fix ReadOnlyIncrementalSnapshotIT.java test

This commit is contained in:
mfvitale 2024-06-11 16:33:31 +02:00 committed by Jiri Pechanec
parent 9704f0c3e0
commit 05a04451ec
2 changed files with 14 additions and 5 deletions

View File

@ -12,6 +12,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
@ -19,6 +20,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot;
@ -209,4 +211,10 @@ var record = records.get(i);
assertThat(((Struct) record.value()).getStruct("after").getInt32("aa")).isEqualTo(i);
}
}
@Override
protected Function<Configuration.Builder, Configuration.Builder> additionalConfiguration() {
return x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.with(Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME, 5000);
}
}

View File

@ -39,7 +39,6 @@
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenConnectorUnderTest;
@ -584,8 +583,7 @@ public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncreme
// we are still within the incremental snapshot rather than it being performed with one
// round trip to the database
populateTable();
startConnector(x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.with(Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME, 5000));
startConnector(additionalConfiguration());
// Send ad-hoc start incremental snapshot signal and wait for incremental snapshots to start
sendAdHocSnapshotSignalAndWait();
@ -628,6 +626,10 @@ public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncreme
}
}
protected Function<Configuration.Builder, Configuration.Builder> additionalConfiguration() {
return x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1);
}
@Test
@FixFor("DBZ-4271")
public void stopCurrentIncrementalSnapshotWithAllCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
@ -640,8 +642,7 @@ public void stopCurrentIncrementalSnapshotWithAllCollectionsAndTakeNewNewIncreme
// we are still within the incremental snapshot rather than it being performed with one
// round trip to the database
populateTable();
startConnector(x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.with(Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME, 5000));
startConnector(additionalConfiguration());
// Send ad-hoc start incremental snapshot signal and wait for incremental snapshots to start
sendAdHocSnapshotSignalAndWait();