DBZ-800 Adding test for receiving heartbeat events with Postgres
This commit is contained in:
parent
fe53360f30
commit
8e1450bc71
@ -9,6 +9,7 @@
|
||||
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
|
||||
import static io.debezium.connector.postgresql.TestHelper.topicName;
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
@ -34,6 +35,7 @@
|
||||
import io.debezium.data.VariableScaleDecimal;
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.junit.ConditionalFail;
|
||||
import io.debezium.junit.ShouldFailWhen;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -658,6 +660,48 @@ public void shouldPropagateSourceColumnTypeToSchemaParameter() throws Exception
|
||||
assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypesWithSourceColumnTypeInfo());
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-800")
|
||||
public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws Exception {
|
||||
// the low heartbeat interval should make sure that a heartbeat message is emitted after each change record
|
||||
// received from Postgres
|
||||
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
||||
.with(Heartbeat.HEARTBEAT_INTERVAL, "1")
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.b")
|
||||
.build());
|
||||
setupRecordsProducer(config);
|
||||
|
||||
String statement = "CREATE SCHEMA s1;" +
|
||||
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
|
||||
"CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));" +
|
||||
"INSERT INTO s1.a (aa) VALUES (11);" +
|
||||
"INSERT INTO s1.b (bb) VALUES (22);";
|
||||
|
||||
// expecting two heartbeat records and one actual change record
|
||||
consumer = testConsumer(3);
|
||||
recordsProducer.start(consumer, blackHole);
|
||||
executeAndWait(statement);
|
||||
|
||||
// expecting no change record for s1.a but a heartbeat
|
||||
assertHeartBeatRecordInserted();
|
||||
|
||||
// and then a change record for s1.b and a heartbeat
|
||||
assertRecordInserted("s1.b", PK_FIELD, 1);
|
||||
assertHeartBeatRecordInserted();
|
||||
|
||||
assertThat(consumer.isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
private void assertHeartBeatRecordInserted() {
|
||||
assertFalse("records not generated", consumer.isEmpty());
|
||||
|
||||
SourceRecord heartbeat = consumer.remove();
|
||||
assertEquals("__debezium-heartbeat." + TestHelper.TEST_SERVER, heartbeat.topic());
|
||||
|
||||
Struct key = (Struct) heartbeat.key();
|
||||
assertThat(key.get("serverName")).isEqualTo(TestHelper.TEST_SERVER);
|
||||
}
|
||||
|
||||
private void setupRecordsProducer(PostgresConnectorConfig config) {
|
||||
if (recordsProducer != null) {
|
||||
recordsProducer.stop();
|
||||
|
Loading…
Reference in New Issue
Block a user