From 2a3c58f069cd2c1d5a0da243d84a26f2cae3a509 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 23 Jul 2018 17:13:19 +0200 Subject: [PATCH] DBZ-815 Adding test for heartbeat events for Oracle connector --- .../connector/oracle/OracleConnectorIT.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 0c52353fa..e4950d10a 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.oracle; +import static junit.framework.TestCase.assertEquals; import static org.fest.assertions.Assertions.assertThat; import java.math.BigDecimal; @@ -26,7 +27,9 @@ import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode; import io.debezium.connector.oracle.util.TestHelper; import io.debezium.data.VerifyRecord; +import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.heartbeat.Heartbeat; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.util.Testing; @@ -325,6 +328,48 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti assertThat(after.get("REGISTERED")).isEqualTo(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))); } + @Test + @FixFor("DBZ-800") + public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws Exception { + TestHelper.dropTable(connection, "debezium.dbz800a"); + TestHelper.dropTable(connection, "debezium.dbz800b"); + + // the low heartbeat interval should make sure that a heartbeat message is emitted after each change record + // received from Postgres + Configuration config = TestHelper.defaultConfig() + .with(Heartbeat.HEARTBEAT_INTERVAL, "1") + .with(OracleConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.DBZ800B") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + Thread.sleep(1000); + + connection.execute("CREATE TABLE debezium.dbz800a (id NUMBER(9) NOT NULL, aaa VARCHAR2(100), PRIMARY KEY (id) )"); + connection.execute("CREATE TABLE debezium.dbz800b (id NUMBER(9) NOT NULL, bbb VARCHAR2(100), PRIMARY KEY (id) )"); + connection.execute("INSERT INTO debezium.dbz800a VALUES (1, 'AAA')"); + connection.execute("INSERT INTO debezium.dbz800b VALUES (2, 'BBB')"); + connection.execute("COMMIT"); + + // expecting two heartbeat records and one actual change record + List records = consumeRecordsByTopic(3).allRecordsInOrder(); + + // expecting no change record for s1.a but a heartbeat + verifyHeartbeatRecord(records.get(0)); + + // and then a change record for s1.b and a heartbeat + VerifyRecord.isValidInsert(records.get(1), "ID", 2); + verifyHeartbeatRecord(records.get(2)); + } + + private void verifyHeartbeatRecord(SourceRecord heartbeat) { + assertEquals("__debezium-heartbeat.server1", heartbeat.topic()); + + Struct key = (Struct) heartbeat.key(); + assertThat(key.get("serverName")).isEqualTo("server1"); + } + private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) { return localDateTime.toEpochSecond(ZoneOffset.UTC) * MICROS_PER_SECOND; }