DBZ-1166 Test with multiple schemas
This commit is contained in:
parent
92b07d518f
commit
06a436ec98
@ -9,7 +9,6 @@
|
||||
import java.sql.Statement;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -102,7 +101,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
|
||||
super.readSchema(tables, null, schemaNamePattern, null, columnFilter, removeTablesNotFoundInJdbc);
|
||||
|
||||
Set<TableId> tableIds = tables.tableIds().stream().filter(x -> schemaNamePattern.equals(x.schema())).collect(Collectors.toSet());
|
||||
|
||||
|
||||
for (TableId tableId : tableIds) {
|
||||
// super.readSchema() populates ids without the catalog; hence we apply the filtering only
|
||||
// here and if a table is included, overwrite it with a new id including the catalog
|
||||
|
@ -34,10 +34,12 @@
|
||||
public class OracleConnectorFilterIT extends AbstractConnectorTest {
|
||||
|
||||
private static OracleConnection connection;
|
||||
private static OracleConnection adminConnection;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws SQLException {
|
||||
connection = TestHelper.testConnection();
|
||||
adminConnection = TestHelper.adminConnection();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@ -54,6 +56,24 @@ public void before() throws SQLException {
|
||||
TestHelper.dropTable(connection, "debezium.table2");
|
||||
TestHelper.dropTable(connection, "debezium.table3");
|
||||
|
||||
try {
|
||||
adminConnection.execute("DROP USER debezium2 CASCADE");
|
||||
}
|
||||
catch (SQLException e) {
|
||||
}
|
||||
|
||||
adminConnection.execute(
|
||||
"CREATE USER debezium2 IDENTIFIED BY dbz",
|
||||
"GRANT CONNECT TO debezium2",
|
||||
"GRANT CREATE SESSION TO debezium2",
|
||||
"GRANT CREATE TABLE TO debezium2",
|
||||
"GRANT CREATE SEQUENCE TO debezium2",
|
||||
"ALTER USER debezium2 QUOTA 100M ON users",
|
||||
"create table debezium2.table2 (id numeric(9,0) not null,name varchar2(1000),primary key (id))",
|
||||
"GRANT ALL PRIVILEGES ON debezium2.table2 to debezium",
|
||||
"GRANT SELECT ON debezium2.table2 to " + TestHelper.CONNECTOR_USER,
|
||||
"ALTER TABLE debezium2.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"
|
||||
);
|
||||
String ddl = "create table debezium.table1 (" +
|
||||
" id numeric(9,0) not null, " +
|
||||
" name varchar2(1000), " +
|
||||
@ -83,7 +103,7 @@ public void shouldApplyWhitelistConfiguration() throws Exception {
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(
|
||||
RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
|
||||
"ORCLPDB1\\.DEBEZIUM\\.TABLE1,ORCLPDB1\\.DEBEZIUM\\.TABLE3")
|
||||
"ORCLPDB1\\.DEBEZIUM2\\.TABLE2,ORCLPDB1\\.DEBEZIUM\\.TABLE1,ORCLPDB1\\.DEBEZIUM\\.TABLE3")
|
||||
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
|
||||
.build();
|
||||
|
||||
|
@ -84,6 +84,19 @@ private static JdbcConfiguration testJdbcConfig() {
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a JDBC configuration for database admin user (NOT the XStream user).
|
||||
*/
|
||||
private static JdbcConfiguration adminJdbcConfig() {
|
||||
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.admin."))
|
||||
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
|
||||
.withDefault(JdbcConfiguration.PORT, 1521)
|
||||
.withDefault(JdbcConfiguration.USER, "sys as sysdba")
|
||||
.withDefault(JdbcConfiguration.PASSWORD, "top_secret")
|
||||
.withDefault(JdbcConfiguration.DATABASE, "ORCLPDB1")
|
||||
.build();
|
||||
}
|
||||
|
||||
private static Configuration.Builder testConfig() {
|
||||
JdbcConfiguration jdbcConfiguration = testJdbcConfig();
|
||||
Configuration.Builder builder = Configuration.create();
|
||||
@ -95,6 +108,17 @@ private static Configuration.Builder testConfig() {
|
||||
return builder;
|
||||
}
|
||||
|
||||
private static Configuration.Builder adminConfig() {
|
||||
JdbcConfiguration jdbcConfiguration = adminJdbcConfig();
|
||||
Configuration.Builder builder = Configuration.create();
|
||||
|
||||
jdbcConfiguration.forEach(
|
||||
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)
|
||||
);
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static OracleConnection testConnection() {
|
||||
Configuration config = testConfig().build();
|
||||
Configuration jdbcConfig = config.subset("database.", true);
|
||||
@ -116,6 +140,27 @@ public static OracleConnection testConnection() {
|
||||
return jdbcConnection;
|
||||
}
|
||||
|
||||
public static OracleConnection adminConnection() {
|
||||
Configuration config = adminConfig().build();
|
||||
Configuration jdbcConfig = config.subset("database.", true);
|
||||
|
||||
OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory());
|
||||
try {
|
||||
jdbcConnection.setAutoCommit(false);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
String pdbName = new OracleConnectorConfig(config).getPdbName();
|
||||
|
||||
if (pdbName != null) {
|
||||
jdbcConnection.setSessionToPdb(pdbName);
|
||||
}
|
||||
|
||||
return jdbcConnection;
|
||||
}
|
||||
|
||||
public static void dropTable(OracleConnection connection, String table) {
|
||||
try {
|
||||
connection.execute("drop table " + table);
|
||||
|
Loading…
Reference in New Issue
Block a user