DBZ-6528 Fix Oracle NPE when storing only captured table schemas with signals
This commit is contained in:
parent
ab28e12d2e
commit
ef8059f3ff
@ -13,6 +13,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
@ -31,6 +32,7 @@
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.schema.SchemaChangeEvent;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* A {@link StreamingChangeEventSource} for Oracle.
|
||||
@ -256,6 +258,15 @@ protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<OraclePar
|
||||
return Optional.of(String.format("SELECT %s FROM %s AS OF SCN %s", snapshotSelectColumns, quote(tableId), snapshotOffset));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Pattern> getSignalDataCollectionPattern(String signalingDataCollection) {
|
||||
// Oracle expects this value to be supplied using "<database>.<schema>.<table>"; however the
|
||||
// TableIdMapper used by the connector uses only "<schema>.<table>". This primarily targets
|
||||
// a fix for this specific use case as a much larger refactor is likely necessary long term.
|
||||
final TableId tableId = TableId.parse(signalingDataCollection);
|
||||
return Strings.listOfRegex(tableId.schema() + "." + tableId.table(), Pattern.CASE_INSENSITIVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (connectorConfig.getPdbName() != null) {
|
||||
|
@ -5311,6 +5311,52 @@ public void shouldRestartOracleJdbcConnectionUponLogSwitch() throws Exception {
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-6528")
|
||||
public void shouldNotFailToStartWhenSignalDataCollectionNotDefinedWithinTableIncludeList() throws Exception {
|
||||
try {
|
||||
TestHelper.dropTable(connection, "dbz6528");
|
||||
TestHelper.dropTable(connection, "dbz6495");
|
||||
|
||||
try (OracleConnection admin = TestHelper.adminConnection()) {
|
||||
admin.setSessionToPdb(TestHelper.DATABASE);
|
||||
TestHelper.dropTable(admin, "c##dbzuser.signals");
|
||||
admin.execute("CREATE TABLE c##dbzuser.signals (id varchar2(64), type varchar2(32), data varchar2(2048))");
|
||||
TestHelper.streamTable(admin, "c##dbzuser.signals");
|
||||
}
|
||||
|
||||
connection.execute("CREATE TABLE dbz6528 (id numeric(9,0), data varchar2(50))");
|
||||
TestHelper.streamTable(connection, "dbz6528");
|
||||
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6528")
|
||||
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".C##DBZUSER.SIGNALS")
|
||||
.with(OracleConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, "true")
|
||||
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY.getValue())
|
||||
.build();
|
||||
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
|
||||
connection.execute("INSERT INTO dbz6528 (id,data) values (1, 'data')");
|
||||
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6528");
|
||||
assertThat(tableRecords).hasSize(1);
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
finally {
|
||||
TestHelper.dropTable(connection, "dbz6528");
|
||||
try (OracleConnection admin = TestHelper.adminConnection()) {
|
||||
admin.setSessionToPdb(TestHelper.DATABASE);
|
||||
TestHelper.dropTable(connection, "c##dbzuser.signals");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
|
||||
try (OracleConnection admin = TestHelper.adminConnection(true)) {
|
||||
final Scn scn = admin.getCurrentScn();
|
||||
|
@ -232,6 +232,10 @@ protected void connectionCreated(RelationalSnapshotContext<P, O> snapshotContext
|
||||
protected void connectionPoolConnectionCreated(RelationalSnapshotContext<P, O> snapshotContext, JdbcConnection connection) throws SQLException {
|
||||
}
|
||||
|
||||
protected List<Pattern> getSignalDataCollectionPattern(String signalingDataCollection) {
|
||||
return Strings.listOfRegex(signalingDataCollection, Pattern.CASE_INSENSITIVE);
|
||||
}
|
||||
|
||||
private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
|
||||
return tableIds
|
||||
.stream()
|
||||
@ -247,7 +251,7 @@ private Set<TableId> addSignalingCollectionAndSort(Set<TableId> capturedTables)
|
||||
captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE));
|
||||
}
|
||||
if (!Strings.isNullOrBlank(signalingDataCollection)) {
|
||||
captureTablePatterns.addAll(Strings.listOfRegex(signalingDataCollection, Pattern.CASE_INSENSITIVE));
|
||||
captureTablePatterns.addAll(getSignalDataCollectionPattern(signalingDataCollection));
|
||||
}
|
||||
if (captureTablePatterns.size() > 0) {
|
||||
return captureTablePatterns
|
||||
|
Loading…
Reference in New Issue
Block a user