DBZ-7732 Fix test wait condition

This commit is contained in:
mfvitale 2024-04-09 11:08:44 +02:00 committed by Jiri Pechanec
parent 64454cc378
commit 72b3b4c3c0

View File

@ -46,6 +46,7 @@
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.signal.actions.snapshotting.StopSnapshot;
import io.debezium.util.Testing;
public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector> extends AbstractSnapshotTest<T> {
@ -412,7 +413,7 @@ public void snapshotOnlyWithRestart() throws Exception {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignal("s1.a", "s1.b");
sendAdHocSnapshotSignal();
final int expectedRecordCount = ROW_COUNT;
final AtomicInteger recordCounter = new AtomicInteger();
@ -435,11 +436,12 @@ public void snapshotOnlyWithRestart() throws Exception {
@Test
@FixFor("DBZ-7716")
public void whenSnapshotMultipleTableAndCorrectorRestartsThenOnlyNotAlreadyProcessedTableMustBeProcessed() throws Exception {
public void whenSnapshotMultipleTablesAndConnectorRestartsThenOnlyNotAlreadyProcessedTableMustBeProcessed() throws Exception {
// Testing.Print.enable();
populateTables();
final Configuration config = config()
.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 200)
.build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
@ -448,22 +450,24 @@ public void whenSnapshotMultipleTableAndCorrectorRestartsThenOnlyNotAlreadyProce
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignal(tableNames().toArray(new String[0]));
sendAdHocSnapshotSignal(tableDataCollectionIds().toArray(new String[0]));
final int expectedRecordCount = ROW_COUNT * 2;
final AtomicInteger recordCounter = new AtomicInteger();
final AtomicBoolean restarted = new AtomicBoolean();
List<SourceRecord> dbChanges = new ArrayList<>();
consumeRecordsUntil((i, r) -> recordCounter.addAndGet(1) == expectedRecordCount,
consumeRecordsUntil((i, r) -> recordCounter.get() == expectedRecordCount,
(recordsConsumed, record) -> "",
5,
record -> {
Testing.print("Record counter " + recordCounter.get());
if (topicNames().contains(record.topic())) { // We want to exclude the changed from signal table
dbChanges.add(record);
if (!record.topic().contains(topicName()) &&
recordCounter.addAndGet(1) > 250
recordCounter.addAndGet(1) > 150
&& !restarted.get()) {
stopConnector();
assertConnectorNotRunning();