diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index eb21a4cae..a6c8afe41 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -879,9 +879,11 @@ public void shouldFlushLsnOnEmptyMessage() throws InterruptedException, SQLExcep final Set flushLsn = new HashSet<>(); TestHelper.execute(INSERT_STMT); - final SourceRecords actualRecords = consumeRecordsByTopic(1); - assertThat(actualRecords.topics().size()).isEqualTo(1); - assertThat(actualRecords.recordsForTopic(topicName("s1.a")).size()).isEqualTo(1); + Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + final SourceRecords actualRecords = consumeRecordsByTopic(1); + final List topicRecords = actualRecords.recordsForTopic(topicName("s1.a")); + return topicRecords != null && topicRecords.size() == 1; + }); try (final PostgresConnection connection = TestHelper.create()) { flushLsn.add(getConfirmedFlushLsn(connection)); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 51929c939..1eb32b3f9 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -8,7 +8,6 @@ import static io.debezium.connector.postgresql.TestHelper.PK_FIELD; import static io.debezium.connector.postgresql.TestHelper.topicName; -import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.DECODERBUFS; import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT; import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON; import static junit.framework.TestCase.assertEquals; @@ -22,8 +21,10 @@ import java.time.ZoneOffset; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -1337,29 +1338,36 @@ public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTable @Test() @FixFor("DBZ-1181") - @SkipWhenDecoderPluginNameIs(value = DECODERBUFS, reason = "") public void testEmptyChangesProducesHeartbeat() throws Exception { // the low heartbeat interval should make sure that a heartbeat message is emitted after each change record // received from Postgres - startConnector(config -> config.with(Heartbeat.HEARTBEAT_INTERVAL, "1")); + startConnector(config -> config.with(Heartbeat.HEARTBEAT_INTERVAL, "100")); - // Expecting 1 heartbeat + 1 data change - consumer.expects(1 + 1); - - executeAndWait( + TestHelper.execute( "DROP TABLE IF EXISTS test_table;" + "CREATE TABLE test_table (id SERIAL, text TEXT);" + "INSERT INTO test_table (text) VALUES ('mydata');"); - consumer.clear(); - consumer.expects(1); + // Expecting 1 data change + Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + final SourceRecord record = consumeRecord(); + return record != null && record.valueSchema().name().endsWith(".Envelope"); + }); + // Expecting one empty DDL change String statement = "CREATE SCHEMA s1;"; - executeAndWait(statement); + TestHelper.execute(statement); - // Expecting one heartbeat for the empty DDL change - assertHeartBeatRecordInserted(); + // Expecting changes for the empty DDL change + final Set lsns = new HashSet<>(); + Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + final SourceRecord record = consumeRecord(); + Assertions.assertThat(record.valueSchema().name()).endsWith(".Heartbeat"); + lsns.add((Long) record.sourceOffset().get("lsn")); + // CREATE SCHEMA should change LSN + return lsns.size() > 1; + }); assertThat(consumer.isEmpty()).isTrue(); }