DBZ-815 Adding test for heartbeat events for Oracle connector
This commit is contained in:
parent
2d4320ccd5
commit
2a3c58f069
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
@ -26,7 +27,9 @@
|
||||
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.oracle.util.TestHelper;
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
@ -325,6 +328,48 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti
|
||||
assertThat(after.get("REGISTERED")).isEqualTo(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-800")
|
||||
public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws Exception {
|
||||
TestHelper.dropTable(connection, "debezium.dbz800a");
|
||||
TestHelper.dropTable(connection, "debezium.dbz800b");
|
||||
|
||||
// the low heartbeat interval should make sure that a heartbeat message is emitted after each change record
|
||||
// received from Postgres
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(Heartbeat.HEARTBEAT_INTERVAL, "1")
|
||||
.with(OracleConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.DBZ800B")
|
||||
.build();
|
||||
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
connection.execute("CREATE TABLE debezium.dbz800a (id NUMBER(9) NOT NULL, aaa VARCHAR2(100), PRIMARY KEY (id) )");
|
||||
connection.execute("CREATE TABLE debezium.dbz800b (id NUMBER(9) NOT NULL, bbb VARCHAR2(100), PRIMARY KEY (id) )");
|
||||
connection.execute("INSERT INTO debezium.dbz800a VALUES (1, 'AAA')");
|
||||
connection.execute("INSERT INTO debezium.dbz800b VALUES (2, 'BBB')");
|
||||
connection.execute("COMMIT");
|
||||
|
||||
// expecting two heartbeat records and one actual change record
|
||||
List<SourceRecord> records = consumeRecordsByTopic(3).allRecordsInOrder();
|
||||
|
||||
// expecting no change record for s1.a but a heartbeat
|
||||
verifyHeartbeatRecord(records.get(0));
|
||||
|
||||
// and then a change record for s1.b and a heartbeat
|
||||
VerifyRecord.isValidInsert(records.get(1), "ID", 2);
|
||||
verifyHeartbeatRecord(records.get(2));
|
||||
}
|
||||
|
||||
private void verifyHeartbeatRecord(SourceRecord heartbeat) {
|
||||
assertEquals("__debezium-heartbeat.server1", heartbeat.topic());
|
||||
|
||||
Struct key = (Struct) heartbeat.key();
|
||||
assertThat(key.get("serverName")).isEqualTo("server1");
|
||||
}
|
||||
|
||||
private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) {
|
||||
return localDateTime.toEpochSecond(ZoneOffset.UTC) * MICROS_PER_SECOND;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user