DBZ-1698 Fix test timing issue

This commit is contained in:
Jiri Pechanec 2020-02-03 15:05:20 +01:00 committed by Gunnar Morling
parent 2a6ea1c41f
commit 1d9ac71a9e

View File

@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -1056,7 +1057,7 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E
// the low heartbeat interval should make sure that a heartbeat message is emitted after each change record
// received from Postgres
startConnector(config -> config
.with(Heartbeat.HEARTBEAT_INTERVAL, "1")
.with(Heartbeat.HEARTBEAT_INTERVAL, "100")
.with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.b")
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER),
@ -1066,33 +1067,39 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E
String statement = "CREATE SCHEMA s1;" +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));" +
"INSERT INTO s1.a (aa) VALUES (11);" +
"INSERT INTO s1.b (bb) VALUES (22);";
// streaming from database is non-blocking so we should receive many heartbeats
final int expectedAtMostStartHeartbeats = 10;
final int expectedHeartbeats = 5;
// heartbeat for unfiltered table, data change, heartbeats
consumer = testConsumer(expectedAtMostStartHeartbeats + 1 + expectedHeartbeats);
consumer.setIgnoreExtraRecords(true);
executeAndWait(statement);
TestHelper.execute(statement);
final AtomicInteger heartbeatCount = new AtomicInteger();
// change record for s1.b and heartbeats
Optional<SourceRecord> record;
int startHeartbeats = 0;
while (true) {
record = isHeartBeatRecordInserted();
if (record.isPresent()) {
assertThat(startHeartbeats).describedAs("Too many start heartbeats").isLessThanOrEqualTo(expectedAtMostStartHeartbeats);
break;
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
System.out.println(record);
if (record != null) {
if (record.topic().endsWith("s1.b")) {
assertRecordInserted(record, "s1.b", PK_FIELD, 1);
return true;
}
else {
assertHeartBeatRecord(record);
heartbeatCount.incrementAndGet();
}
}
startHeartbeats++;
}
return false;
});
Assertions.assertThat(heartbeatCount.get()).isGreaterThan(0);
assertRecordInserted(record.get(), "s1.b", PK_FIELD, 1);
for (int i = 0; i < expectedHeartbeats; i++) {
assertHeartBeatRecordInserted();
}
final Set<Long> lsn = new HashSet<>();
TestHelper.execute("INSERT INTO s1.a (aa) VALUES (11);");
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
if (record != null) {
lsn.add((Long) record.sourceOffset().get("lsn"));
return lsn.size() >= 2;
}
return false;
});
Assertions.assertThat(lsn.size()).isGreaterThanOrEqualTo(2);
}
@Test
@ -2024,7 +2031,10 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
private void assertHeartBeatRecordInserted() {
assertFalse("records not generated", consumer.isEmpty());
SourceRecord heartbeat = consumer.remove();
assertHeartBeatRecord(consumer.remove());
}
private void assertHeartBeatRecord(SourceRecord heartbeat) {
assertEquals("__debezium-heartbeat." + TestHelper.TEST_SERVER, heartbeat.topic());
Struct key = (Struct) heartbeat.key();