DBZ-1727 Reduce heartbeat interval in test

This commit is contained in:
Igor Gabaydulin 2020-01-29 13:54:39 +03:00 committed by Jiri Pechanec
parent a2a6525c84
commit 3890038c21

View File

@ -15,6 +15,7 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import java.math.BigDecimal;
import java.time.Instant;
@ -1098,9 +1099,6 @@ public void shouldPropagateSourceColumnTypeScaleToSchemaParameter() throws Excep
@Test
@FixFor("DBZ-800")
public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws Exception {
// the high heartbeat interval should make sure that a heartbeat message is emitted only
// after insert statement which allows to check that lsn is not flushed by itself
// but only when heartbeat message is produced
startConnector(config -> config
.with(Heartbeat.HEARTBEAT_INTERVAL, "100")
.with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50")
@ -1112,39 +1110,38 @@ public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws E
String statement = "CREATE SCHEMA s1;" +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));" +
"INSERT INTO s1.b (bb) VALUES (22);";
"INSERT INTO s1.a (aa) VALUES (11);";
TestHelper.execute(statement);
final AtomicInteger heartbeatCount = new AtomicInteger();
// only heartbeats records
consumer = testConsumer(15);
consumer.setIgnoreExtraRecords(true);
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
System.out.println(record);
if (record != null) {
if (record.topic().endsWith("s1.b")) {
assertRecordInserted(record, "s1.b", PK_FIELD, 1);
return true;
}
else {
assertHeartBeatRecord(record);
heartbeatCount.incrementAndGet();
}
try (PostgresConnection postgresConnection = TestHelper.create()) {
TestHelper.execute(statement);
// check if client's lsn is not flushed yet
SlotState slotState = postgresConnection.getReplicationSlotState(Builder.DEFAULT_SLOT_NAME, TestHelper.decoderPlugin().getPostgresPluginName());
long flushLsn = slotState.slotLastFlushedLsn();
// serverLsn is the latest server lsn and is equal to insert statement lsn
long serverLsn = postgresConnection.currentXLogLocation();
assertNotEquals("lsn should not be flushed until heartbeat is produced", serverLsn, flushLsn);
// awaiting heartbeats to be produced
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = null;
while (!consumer.isEmpty()) {
record = consumer.remove();
}
return false;
});
Assertions.assertThat(heartbeatCount.get()).isGreaterThan(0);
assertNotNull("heartbeats are not generated", record);
final Set<Long> lsn = new HashSet<>();
TestHelper.execute("INSERT INTO s1.a (aa) VALUES (11);");
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
if (record != null) {
lsn.add((Long) record.sourceOffset().get("lsn"));
return lsn.size() >= 2;
}
return false;
});
Assertions.assertThat(lsn.size()).isGreaterThanOrEqualTo(2);
long heartbeatLsn = (Long) record.sourceOffset().get("lsn");
// check if flushed lsn is equal to or greater than server lsn
SlotState slotStateAfterHeartbeat = postgresConnection.getReplicationSlotState(Builder.DEFAULT_SLOT_NAME, TestHelper.decoderPlugin().getPostgresPluginName());
long flushedLsn = slotStateAfterHeartbeat.slotLastFlushedLsn();
assertTrue("lsn should be flushed when heartbeat is produced", flushedLsn >= serverLsn);
}
}
@Test