From 3890038c21c051f9e87926d9b4d25f53b137aa3e Mon Sep 17 00:00:00 2001 From: Igor Gabaydulin Date: Wed, 29 Jan 2020 13:54:39 +0300 Subject: [PATCH] DBZ-1727 Reduce heartbeat interval in test --- .../postgresql/RecordsStreamProducerIT.java | 61 +++++++++---------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index e8706b0a8..1f9574654 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -15,6 +15,7 @@ import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import java.math.BigDecimal; import java.time.Instant; @@ -1098,9 +1099,6 @@ public void shouldPropagateSourceColumnTypeScaleToSchemaParameter() throws Excep @Test @FixFor("DBZ-800") public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws Exception { - // the high heartbeat interval should make sure that a heartbeat message is emitted only - // after insert statement which allows to check that lsn is not flushed by itself - // but only when heartbeat message is produced startConnector(config -> config .with(Heartbeat.HEARTBEAT_INTERVAL, "100") .with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50") @@ -1112,39 +1110,38 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E String statement = "CREATE SCHEMA s1;" + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + "CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));" + - "INSERT INTO s1.b (bb) VALUES (22);"; + "INSERT INTO s1.a (aa) VALUES (11);"; - TestHelper.execute(statement); - final AtomicInteger heartbeatCount = new AtomicInteger(); + // only heartbeats records + consumer = testConsumer(15); + consumer.setIgnoreExtraRecords(true); - Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { - final SourceRecord record = consumeRecord(); - System.out.println(record); - if (record != null) { - if (record.topic().endsWith("s1.b")) { - assertRecordInserted(record, "s1.b", PK_FIELD, 1); - return true; - } - else { - assertHeartBeatRecord(record); - heartbeatCount.incrementAndGet(); - } + try (PostgresConnection postgresConnection = TestHelper.create()) { + + TestHelper.execute(statement); + + // check if client's lsn is not flushed yet + SlotState slotState = postgresConnection.getReplicationSlotState(Builder.DEFAULT_SLOT_NAME, TestHelper.decoderPlugin().getPostgresPluginName()); + long flushLsn = slotState.slotLastFlushedLsn(); + // serverLsn is the latest server lsn and is equal to insert statement lsn + long serverLsn = postgresConnection.currentXLogLocation(); + assertNotEquals("lsn should not be flushed until heartbeat is produced", serverLsn, flushLsn); + + // awaiting heartbeats to be produced + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + SourceRecord record = null; + while (!consumer.isEmpty()) { + record = consumer.remove(); } - return false; - }); - Assertions.assertThat(heartbeatCount.get()).isGreaterThan(0); + assertNotNull("heartbeats are not generated", record); - final Set lsn = new HashSet<>(); - TestHelper.execute("INSERT INTO s1.a (aa) VALUES (11);"); - Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { - final SourceRecord record = consumeRecord(); - if (record != null) { - lsn.add((Long) record.sourceOffset().get("lsn")); - return lsn.size() >= 2; - } - return false; - }); - Assertions.assertThat(lsn.size()).isGreaterThanOrEqualTo(2); + long heartbeatLsn = (Long) record.sourceOffset().get("lsn"); + + // check if flushed lsn is equal to or greater than server lsn + SlotState slotStateAfterHeartbeat = postgresConnection.getReplicationSlotState(Builder.DEFAULT_SLOT_NAME, TestHelper.decoderPlugin().getPostgresPluginName()); + long flushedLsn = slotStateAfterHeartbeat.slotLastFlushedLsn(); + assertTrue("lsn should be flushed when heartbeat is produced", flushedLsn >= serverLsn); + } } @Test