DBZ-7942 Address parsing signal tables with special characters
This commit is contained in:
parent
dbb11231d3
commit
c06b1ce0be
@ -16,6 +16,8 @@
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -838,6 +840,16 @@ public RedoThreadState getRedoThreadState() throws SQLException {
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getSQLKeywords() {
|
||||
try {
|
||||
return Arrays.asList(connection().getMetaData().getSQLKeywords().split(","));
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOGGER.debug("Failed to acquire SQL keywords from JDBC driver.", e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
private static Scn readScnColumnAsScn(ResultSet rs, String columnName) throws SQLException {
|
||||
final String value = rs.getString(columnName);
|
||||
return Strings.isNullOrEmpty(value) ? Scn.NULL : Scn.valueOf(value);
|
||||
|
@ -44,7 +44,7 @@ public OracleSignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseC
|
||||
@Override
|
||||
protected String getSignalTableName(String dataCollectionId) {
|
||||
final TableId tableId = OracleTableIdParser.parse(dataCollectionId);
|
||||
return tableId.schema() + "." + tableId.table();
|
||||
return OracleTableIdParser.quoteIfNeeded(tableId, false, true, ((OracleConnection) jdbcConnection).getSQLKeywords());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -5,7 +5,10 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* Specialized parser implementation for Oracle {@link TableId} instances.
|
||||
@ -25,7 +28,27 @@ public static TableId parse(String identifier) {
|
||||
final String catalogName = resolveCatalogFromDomainName(parts);
|
||||
return new TableId(catalogName, schemaName, tableName);
|
||||
}
|
||||
return TableId.parse(identifier);
|
||||
return TableId.parse(identifier, false);
|
||||
}
|
||||
|
||||
public static String quoteIfNeeded(TableId tableId, boolean useCatalog, boolean useSchema, List<String> keywords) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
if (useCatalog) {
|
||||
sb.append(quotePartIfNeeded(tableId.catalog(), keywords)).append(".");
|
||||
}
|
||||
else if (useSchema) {
|
||||
sb.append(quotePartIfNeeded(tableId.schema(), keywords)).append(".");
|
||||
}
|
||||
return sb.append(quotePartIfNeeded(tableId.table(), keywords)).toString();
|
||||
}
|
||||
|
||||
private static String quotePartIfNeeded(String part, List<String> keywords) {
|
||||
if (!Strings.isNullOrEmpty(part)) {
|
||||
if (part.startsWith("_") || keywords.stream().anyMatch(keyword -> keyword.equalsIgnoreCase(part))) {
|
||||
return "\"" + part + "\"";
|
||||
}
|
||||
}
|
||||
return part;
|
||||
}
|
||||
|
||||
private static String resolveCatalogFromDomainName(String[] parts) {
|
||||
|
@ -44,11 +44,11 @@ public void before() throws Exception {
|
||||
createTables();
|
||||
|
||||
// todo: creates signal table in the PDB, do we want it to be in the CDB?
|
||||
TestHelper.dropTable(connection, "debezium_signal");
|
||||
connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))");
|
||||
connection.execute("GRANT INSERT on debezium_signal to " + TestHelper.getConnectorUserName());
|
||||
connection.execute("GRANT DELETE on debezium_signal to " + TestHelper.getConnectorUserName());
|
||||
TestHelper.streamTable(connection, "debezium_signal");
|
||||
TestHelper.dropTable(connection, "\"__DEBEZIUM_SIGNAL\"");
|
||||
connection.execute("CREATE TABLE \"__DEBEZIUM_SIGNAL\" (id varchar2(64), type varchar2(32), data varchar2(2048))");
|
||||
connection.execute("GRANT INSERT on \"__DEBEZIUM_SIGNAL\" to " + TestHelper.getConnectorUserName());
|
||||
connection.execute("GRANT DELETE on \"__DEBEZIUM_SIGNAL\" to " + TestHelper.getConnectorUserName());
|
||||
TestHelper.streamTable(connection, "\"__DEBEZIUM_SIGNAL\"");
|
||||
|
||||
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
|
||||
initializeConnectorTestFramework();
|
||||
@ -60,7 +60,7 @@ public void after() throws Exception {
|
||||
stopConnector();
|
||||
if (connection != null) {
|
||||
dropTables();
|
||||
TestHelper.dropTable(connection, "debezium_signal");
|
||||
TestHelper.dropTable(connection, "\"__DEBEZIUM_SIGNAL\"");
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
@ -149,14 +149,14 @@ protected List<String> tableDataCollectionIds() {
|
||||
|
||||
@Override
|
||||
protected String signalTableName() {
|
||||
return "DEBEZIUM.DEBEZIUM_SIGNAL";
|
||||
return "DEBEZIUM.\"__DEBEZIUM_SIGNAL\"";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration.Builder config() {
|
||||
return TestHelper.defaultConfig()
|
||||
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA)
|
||||
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
|
||||
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.\"__DEBEZIUM_SIGNAL\"")
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A,DEBEZIUM\\.B,DEBEZIUM\\.A42")
|
||||
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DEBEZIUM.A42:pk1,pk2,pk3,pk4")
|
||||
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
|
||||
@ -173,7 +173,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
|
||||
}
|
||||
return TestHelper.defaultConfig()
|
||||
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL)
|
||||
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
|
||||
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.\"__DEBEZIUM_SIGNAL\"")
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
|
||||
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "DEBEZIUM.A42:pk1,pk2,pk3,pk4")
|
||||
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
|
||||
|
@ -7,6 +7,9 @@
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.doc.FixFor;
|
||||
@ -35,4 +38,31 @@ public void shouldParseFullyQualifiedTableWithoutDomainNameAsCatalogName() throw
|
||||
assertThat(tableId.catalog()).isEqualTo("SERVER1");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-7942")
|
||||
public void shouldQuoteTableNameThatStartsWithUnderscoreWithDatabaseAndSchemaNames() throws Exception {
|
||||
final List<String> keywords = Collections.singletonList("FROM");
|
||||
final TableId tableId = OracleTableIdParser.parse("DB.SCHEMA.__DEBEZIUM_SIGNAL");
|
||||
final String quotedValue = OracleTableIdParser.quoteIfNeeded(tableId, false, true, keywords);
|
||||
assertThat(quotedValue).isEqualTo("SCHEMA.\"__DEBEZIUM_SIGNAL\"");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-7942")
|
||||
public void shouldQuoteTableNameThatStartsWithUnderscoreWithSchemaName() throws Exception {
|
||||
final List<String> keywords = Collections.singletonList("FROM");
|
||||
final TableId tableId = OracleTableIdParser.parse("SCHEMA.__DEBEZIUM_SIGNAL");
|
||||
final String quotedValue = OracleTableIdParser.quoteIfNeeded(tableId, false, true, keywords);
|
||||
assertThat(quotedValue).isEqualTo("SCHEMA.\"__DEBEZIUM_SIGNAL\"");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-7942")
|
||||
public void shouldQuoteTableNameThatContainsDatabaseKeyword() throws Exception {
|
||||
final List<String> keywords = Collections.singletonList("FROM");
|
||||
final TableId tableId = OracleTableIdParser.parse("SCHEMA.FROM");
|
||||
final String quotedValue = OracleTableIdParser.quoteIfNeeded(tableId, false, true, keywords);
|
||||
assertThat(quotedValue).isEqualTo("SCHEMA.\"FROM\"");
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user