DBZ-1052 Stabilize test for changed heartbeats

This commit is contained in:
Jiri Pechanec 2020-01-31 15:43:34 +01:00 committed by Gunnar Morling
parent 010e0c6fa6
commit 34d2979123
2 changed files with 25 additions and 15 deletions

View File

@ -879,9 +879,11 @@ public void shouldFlushLsnOnEmptyMessage() throws InterruptedException, SQLExcep
final Set<String> flushLsn = new HashSet<>(); final Set<String> flushLsn = new HashSet<>();
TestHelper.execute(INSERT_STMT); TestHelper.execute(INSERT_STMT);
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecords actualRecords = consumeRecordsByTopic(1); final SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1); final List<SourceRecord> topicRecords = actualRecords.recordsForTopic(topicName("s1.a"));
assertThat(actualRecords.recordsForTopic(topicName("s1.a")).size()).isEqualTo(1); return topicRecords != null && topicRecords.size() == 1;
});
try (final PostgresConnection connection = TestHelper.create()) { try (final PostgresConnection connection = TestHelper.create()) {
flushLsn.add(getConfirmedFlushLsn(connection)); flushLsn.add(getConfirmedFlushLsn(connection));

View File

@ -8,7 +8,6 @@
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD; import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName; import static io.debezium.connector.postgresql.TestHelper.topicName;
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.DECODERBUFS;
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT; import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT;
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON; import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;
@ -22,8 +21,10 @@
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -1337,29 +1338,36 @@ public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTable
@Test() @Test()
@FixFor("DBZ-1181") @FixFor("DBZ-1181")
@SkipWhenDecoderPluginNameIs(value = DECODERBUFS, reason = "")
public void testEmptyChangesProducesHeartbeat() throws Exception { public void testEmptyChangesProducesHeartbeat() throws Exception {
// the low heartbeat interval should make sure that a heartbeat message is emitted after each change record // the low heartbeat interval should make sure that a heartbeat message is emitted after each change record
// received from Postgres // received from Postgres
startConnector(config -> config.with(Heartbeat.HEARTBEAT_INTERVAL, "1")); startConnector(config -> config.with(Heartbeat.HEARTBEAT_INTERVAL, "100"));
// Expecting 1 heartbeat + 1 data change TestHelper.execute(
consumer.expects(1 + 1);
executeAndWait(
"DROP TABLE IF EXISTS test_table;" + "DROP TABLE IF EXISTS test_table;" +
"CREATE TABLE test_table (id SERIAL, text TEXT);" + "CREATE TABLE test_table (id SERIAL, text TEXT);" +
"INSERT INTO test_table (text) VALUES ('mydata');"); "INSERT INTO test_table (text) VALUES ('mydata');");
consumer.clear();
consumer.expects(1); // Expecting 1 data change
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
return record != null && record.valueSchema().name().endsWith(".Envelope");
});
// Expecting one empty DDL change // Expecting one empty DDL change
String statement = "CREATE SCHEMA s1;"; String statement = "CREATE SCHEMA s1;";
executeAndWait(statement); TestHelper.execute(statement);
// Expecting one heartbeat for the empty DDL change // Expecting changes for the empty DDL change
assertHeartBeatRecordInserted(); final Set<Long> lsns = new HashSet<>();
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
Assertions.assertThat(record.valueSchema().name()).endsWith(".Heartbeat");
lsns.add((Long) record.sourceOffset().get("lsn"));
// CREATE SCHEMA should change LSN
return lsns.size() > 1;
});
assertThat(consumer.isEmpty()).isTrue(); assertThat(consumer.isEmpty()).isTrue();
} }