From 1d9ac71a9eddd11a981946e514eea8f8618bd012 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 3 Feb 2020 15:05:20 +0100 Subject: [PATCH] DBZ-1698 Fix test timing issue --- .../postgresql/RecordsStreamProducerIT.java | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 351940680..1f3007045 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1056,7 +1057,7 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E // the low heartbeat interval should make sure that a heartbeat message is emitted after each change record // received from Postgres startConnector(config -> config - .with(Heartbeat.HEARTBEAT_INTERVAL, "1") + .with(Heartbeat.HEARTBEAT_INTERVAL, "100") .with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50") .with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.b") .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER), @@ -1066,33 +1067,39 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E String statement = "CREATE SCHEMA s1;" + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + "CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));" + - "INSERT INTO s1.a (aa) VALUES (11);" + "INSERT INTO s1.b (bb) VALUES (22);"; - // streaming from database is non-blocking so we should receive many heartbeats - final int expectedAtMostStartHeartbeats = 10; - final int expectedHeartbeats = 5; - // heartbeat for unfiltered table, data change, heartbeats - consumer = testConsumer(expectedAtMostStartHeartbeats + 1 + expectedHeartbeats); - consumer.setIgnoreExtraRecords(true); - executeAndWait(statement); + TestHelper.execute(statement); + final AtomicInteger heartbeatCount = new AtomicInteger(); - // change record for s1.b and heartbeats - Optional record; - int startHeartbeats = 0; - while (true) { - record = isHeartBeatRecordInserted(); - if (record.isPresent()) { - assertThat(startHeartbeats).describedAs("Too many start heartbeats").isLessThanOrEqualTo(expectedAtMostStartHeartbeats); - break; + Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + final SourceRecord record = consumeRecord(); + System.out.println(record); + if (record != null) { + if (record.topic().endsWith("s1.b")) { + assertRecordInserted(record, "s1.b", PK_FIELD, 1); + return true; + } + else { + assertHeartBeatRecord(record); + heartbeatCount.incrementAndGet(); + } } - startHeartbeats++; - } + return false; + }); + Assertions.assertThat(heartbeatCount.get()).isGreaterThan(0); - assertRecordInserted(record.get(), "s1.b", PK_FIELD, 1); - for (int i = 0; i < expectedHeartbeats; i++) { - assertHeartBeatRecordInserted(); - } + final Set lsn = new HashSet<>(); + TestHelper.execute("INSERT INTO s1.a (aa) VALUES (11);"); + Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + final SourceRecord record = consumeRecord(); + if (record != null) { + lsn.add((Long) record.sourceOffset().get("lsn")); + return lsn.size() >= 2; + } + return false; + }); + Assertions.assertThat(lsn.size()).isGreaterThanOrEqualTo(2); } @Test @@ -2024,7 +2031,10 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg private void assertHeartBeatRecordInserted() { assertFalse("records not generated", consumer.isEmpty()); - SourceRecord heartbeat = consumer.remove(); + assertHeartBeatRecord(consumer.remove()); + } + + private void assertHeartBeatRecord(SourceRecord heartbeat) { assertEquals("__debezium-heartbeat." + TestHelper.TEST_SERVER, heartbeat.topic()); Struct key = (Struct) heartbeat.key();