DBZ-8114: waitForAvailableRecords with records.waittime

This commit is contained in:
Lars M Johansson 2024-08-12 17:09:11 +02:00 committed by Jiri Pechanec
parent da143ef8b2
commit ba39a27b89
2 changed files with 10 additions and 10 deletions

View File

@ -340,7 +340,7 @@ public void updatesWithRestart() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -410,7 +410,7 @@ public void snapshotOnlyWithRestart() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -447,7 +447,7 @@ public void whenSnapshotMultipleTablesAndConnectorRestartsThenOnlyNotAlreadyProc
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -838,7 +838,7 @@ public void testPauseDuringSnapshot() throws Exception {
startConnector(x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 50));
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -884,7 +884,7 @@ public void snapshotWithAdditionalCondition() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -910,7 +910,7 @@ public void snapshotWithNewAdditionalConditionsField() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -952,7 +952,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -1004,7 +1004,7 @@ public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -1029,7 +1029,7 @@ public void testNotification() throws Exception {
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());

View File

@ -277,7 +277,7 @@ protected void startConnector(Function<Configuration.Builder, Configuration.Buil
start(connectorClass(), config, callback);
waitForConnectorToStart();
waitForAvailableRecords(5, TimeUnit.SECONDS);
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
if (expectNoRecords) {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();