diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index c3ffa5765..68d1224aa 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -76,14 +76,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withDescription("Name of the pluggable database when working with a multi-tenant set-up. " + "The CDB name must be given via " + DATABASE_NAME.name() + " in this case."); - public static final Field SCHEMA_NAME = Field.create(DATABASE_CONFIG_PREFIX + "schema") - .withDisplayName("Schema name") - .withType(Type.STRING) - .withWidth(Width.MEDIUM) - .withImportance(Importance.HIGH) - .withValidation(OracleConnectorConfig::validateDatabaseSchema) - .withDescription("Name of the connection user to the database "); - public static final Field XSTREAM_SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "out.server.name") .withDisplayName("XStream out server name") .withType(Type.STRING) @@ -314,7 +306,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector Heartbeat.HEARTBEAT_TOPICS_PREFIX, TABLENAME_CASE_INSENSITIVE, ORACLE_VERSION, - SCHEMA_NAME, CONNECTOR_ADAPTER, LOG_MINING_STRATEGY, SNAPSHOT_ENHANCEMENT_TOKEN, @@ -341,7 +332,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector private final boolean tablenameCaseInsensitive; private final OracleVersion oracleVersion; - private final String schemaName; private final Tables.ColumnNameFilter columnFilter; private final HistoryRecorder logMiningHistoryRecorder; private final Configuration jdbcConfig; @@ -374,7 +364,6 @@ public OracleConnectorConfig(Configuration config) { this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); this.tablenameCaseInsensitive = config.getBoolean(TABLENAME_CASE_INSENSITIVE); this.oracleVersion = OracleVersion.parse(config.getString(ORACLE_VERSION)); - this.schemaName = toUpperCase(config.getString(SCHEMA_NAME)); String blacklistedColumns = toUpperCase(config.getString(RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST)); this.columnFilter = getColumnNameFilter(blacklistedColumns); this.logMiningHistoryRecorder = resolveLogMiningHistoryRecorder(config); @@ -429,7 +418,7 @@ public static ConfigDef configDef() { Field.group(config, "Oracle", HOSTNAME, PORT, RelationalDatabaseConnectorConfig.USER, RelationalDatabaseConnectorConfig.PASSWORD, SERVER_NAME, RelationalDatabaseConnectorConfig.DATABASE_NAME, PDB_NAME, - XSTREAM_SERVER_NAME, SNAPSHOT_MODE, CONNECTOR_ADAPTER, LOG_MINING_STRATEGY, URL, TABLENAME_CASE_INSENSITIVE, ORACLE_VERSION, SCHEMA_NAME); + XSTREAM_SERVER_NAME, SNAPSHOT_MODE, CONNECTOR_ADAPTER, LOG_MINING_STRATEGY, URL, TABLENAME_CASE_INSENSITIVE, ORACLE_VERSION); Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS, KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY); @@ -461,6 +450,10 @@ public String getPdbName() { return pdbName; } + public String getCatalogName() { + return pdbName != null ? pdbName : databaseName; + } + public String getXoutServerName() { return xoutServerName; } @@ -477,10 +470,6 @@ public OracleVersion getOracleVersion() { return oracleVersion; } - public String getSchemaName() { - return schemaName; - } - public Tables.ColumnNameFilter getColumnFilter() { return columnFilter; } @@ -943,19 +932,6 @@ public String getConnectorName() { return Module.name(); } - public static int validateDatabaseSchema(Configuration config, Field field, ValidationOutput problems) { - if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { - final String schemaName = config.getString(SCHEMA_NAME); - if (schemaName == null || schemaName.trim().length() == 0) { - problems.accept(SCHEMA_NAME, schemaName, "The '" + SCHEMA_NAME.name() + "' be provided when using the LogMiner connection adapter"); - return 1; - } - } - - // Everything checks out ok. - return 0; - } - public static int validateOutServerName(Configuration config, Field field, ValidationOutput problems) { if (ConnectorAdapter.XSTREAM.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { return Field.isRequired(config, field, problems); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index b6f8bb037..c02f3227d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -74,13 +74,12 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep jdbcConnection.setSessionToPdb(connectorConfig.getPdbName()); } - return new OracleSnapshotContext( - connectorConfig.getPdbName() != null ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName()); + return new OracleSnapshotContext(connectorConfig.getCatalogName()); } @Override protected Set getAllTableIds(RelationalSnapshotContext ctx) throws Exception { - return jdbcConnection.getAllTableIds(ctx.catalogName, connectorConfig.getSchemaName(), false); + return jdbcConnection.getAllTableIds(ctx.catalogName, null, false); // this very slow approach(commented out), it took 30 minutes on an instance with 600 tables // return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"} ); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index e81ce9e08..d754408a1 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -49,7 +49,7 @@ class LogMinerQueryResultProcessor { private final OracleDatabaseSchema schema; private final EventDispatcher dispatcher; private final TransactionalBufferMetrics transactionalBufferMetrics; - private final String catalogName; + private final OracleConnectorConfig connectorConfig; private final Clock clock; private final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class); private long currentOffsetScn = 0; @@ -62,7 +62,7 @@ class LogMinerQueryResultProcessor { TransactionalBuffer transactionalBuffer, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, EventDispatcher dispatcher, - String catalogName, Clock clock, HistoryRecorder historyRecorder) { + Clock clock, HistoryRecorder historyRecorder) { this.context = context; this.metrics = metrics; this.transactionalBuffer = transactionalBuffer; @@ -70,16 +70,16 @@ class LogMinerQueryResultProcessor { this.schema = schema; this.dispatcher = dispatcher; this.transactionalBufferMetrics = transactionalBuffer.getMetrics(); - this.catalogName = catalogName; this.clock = clock; this.historyRecorder = historyRecorder; - this.dmlParser = resolveParser(connectorConfig, catalogName, jdbcConnection); + this.connectorConfig = connectorConfig; + this.dmlParser = resolveParser(connectorConfig, jdbcConnection); } - private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, String catalogName, OracleConnection connection) { + private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, OracleConnection connection) { if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) { OracleValueConverters converter = new OracleValueConverters(connectorConfig, connection); - return new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converter); + return new SimpleDmlParser(connectorConfig.getCatalogName(), converter); } return new LogMinerDmlParser(); } @@ -175,6 +175,11 @@ int processResult(ResultSet resultSet) { // DML if (operationCode == RowMapper.INSERT || operationCode == RowMapper.DELETE || operationCode == RowMapper.UPDATE) { + final TableId tableId = RowMapper.getTableId(connectorConfig.getCatalogName(), resultSet); + if (!connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) { + continue; + } + LOGGER.trace("DML, {}, sql {}", logMessage, redoSql); dmlCounter++; switch (operationCode) { @@ -190,7 +195,7 @@ int processResult(ResultSet resultSet) { } Instant parseStart = Instant.now(); - LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), txId); + LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), tableId, txId); metrics.addCurrentParseTime(Duration.between(parseStart, Instant.now())); if (dmlEntry == null || redoSql == null) { @@ -214,7 +219,6 @@ int processResult(ResultSet resultSet) { dmlEntry.setScn(scn); try { - TableId tableId = RowMapper.getTableId(catalogName, resultSet); transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), (timestamp, smallestScn, commitScn, counter) -> { // update SCN in offset context only if processed SCN less than SCN among other transactions if (smallestScn == null || scn.compareTo(smallestScn) < 0) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index ad38a4648..dea171ba7 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -64,7 +64,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS private final Clock clock; private final OracleDatabaseSchema schema; private final OracleOffsetContext offsetContext; - private final String catalogName; private final boolean isRac; private final Set racHosts = new HashSet<>(); private final JdbcConfiguration jdbcConfiguration; @@ -89,7 +88,6 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, this.schema = schema; this.offsetContext = offsetContext; this.connectorConfig = connectorConfig; - this.catalogName = (connectorConfig.getPdbName() != null) ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName(); this.strategy = connectorConfig.getLogMiningStrategy(); this.isContinuousMining = connectorConfig.isContinuousMining(); this.errorHandler = errorHandler; @@ -142,9 +140,9 @@ public void execute(ChangeEventSourceContext context) { final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, jdbcConnection, connectorConfig, logMinerMetrics, transactionalBuffer, offsetContext, schema, dispatcher, - catalogName, clock, historyRecorder); + clock, historyRecorder); - final String query = SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema); + final String query = SqlUtils.logMinerContentsQuery(connectorConfig, jdbcConnection.username()); try (PreparedStatement miningView = jdbcConnection.connection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) { Set currentRedoLogFiles = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java index 073544192..6cb8f38e7 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/RowMapper.java @@ -13,6 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; import io.debezium.relational.TableId; import io.debezium.util.HexConverter; @@ -191,8 +192,13 @@ private static void logError(TransactionalBufferMetrics metrics, SQLException e, LogMinerHelper.logError(metrics, "Cannot get {}. This entry from LogMiner will be lost due to the {}", s, e); } - public static TableId getTableId(String catalogName, ResultSet rs) throws SQLException { - return new TableId(catalogName, rs.getString(SEG_OWNER), rs.getString(TABLE_NAME)); + public static TableId getTableId(String catalogName, ResultSet rs) { + try { + return new TableId(catalogName, rs.getString(SEG_OWNER), rs.getString(TABLE_NAME)); + } + catch (SQLException e) { + throw new DebeziumException("Cannot resolve TableId from result set data", e); + } } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java index ffd3f0563..196d5fcb9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java @@ -9,16 +9,16 @@ import java.sql.SQLRecoverableException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.Iterator; import java.util.List; -import java.util.StringJoiner; -import java.util.stream.Collectors; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.connector.oracle.OracleConnectorConfig; -import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.relational.TableId; +import io.debezium.util.Strings; import oracle.net.ns.NetException; @@ -199,34 +199,111 @@ static String startLogMinerStatement(Long startScn, Long endScn, OracleConnector } /** - * This is the query from the LogMiner view to get changes. Columns of the view we using are: - * NOTE. Currently we do not capture changes from other schemas - * SCN - The SCN at which a change was made - * COMMIT_SCN - The SCN at which a change was committed - * USERNAME - Name of the user who executed the transaction - * SQL_REDO Reconstructed SQL statement that is equivalent to the original SQL statement that made the change - * OPERATION_CODE - Number of the operation code. - * TABLE_NAME - Name of the modified table - * TIMESTAMP - Timestamp when the database change was made + * This is the query from the LogMiner view to get changes. * - * @param schemaName user name + * The query uses the following columns from the view: + *
+     * SCN - The SCN at which a change was made
+     * SQL_REDO Reconstructed SQL statement that is equivalent to the original SQL statement that made the change
+     * OPERATION_CODE - Number of the operation code
+     * TIMESTAMP - Timestamp when the database change was made
+     * XID - Transaction Identifier
+     * CSF - Continuation SQL flag, identifies rows that should be processed together as a single row (0=no,1=yes)
+     * TABLE_NAME - Name of the modified table
+     * SEG_OWNER - Schema/Tablespace name
+     * OPERATION - Database operation type
+     * USERNAME - Name of the user who executed the transaction
+     * 
+ * + * @param connectorConfig the connector configuration * @param logMinerUser log mining session user name - * @param schema schema * @return the query */ - static String logMinerContentsQuery(String schemaName, String logMinerUser, OracleDatabaseSchema schema) { - List whiteListTableNames = schema.tableIds().stream().map(TableId::table).collect(Collectors.toList()); + static String logMinerContentsQuery(OracleConnectorConfig connectorConfig, String logMinerUser) { + StringBuilder query = new StringBuilder(); + query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME "); + query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" "); + query.append("WHERE OPERATION_CODE IN (1,2,3,5) "); + query.append("AND SCN >= ? "); + query.append("AND SCN < ? "); + query.append("AND TABLE_NAME != '").append(LOGMNR_FLUSH_TABLE).append("' "); - // todo: add ROW_ID, SESSION#, SERIAL#, RS_ID, and SSN - return "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " + - " FROM " + LOGMNR_CONTENTS_VIEW + " WHERE OPERATION_CODE in (1,2,3,5) " + // 5 - DDL - " AND SEG_OWNER = '" + schemaName.toUpperCase() + "' " + - buildTableInPredicate(whiteListTableNames) + - " AND SCN >= ? AND SCN < ? " + - // Capture DDL and MISSING_SCN rows only hwne not performed by SYS, SYSTEM, and LogMiner user - " OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','" + logMinerUser.toUpperCase() + "')) " + - // Capture all COMMIT and ROLLBACK operations performed by the database - " OR (OPERATION_CODE IN (7,36))"; // todo username = schemaName? + String schemaPredicate = buildSchemaPredicate(connectorConfig); + if (!Strings.isNullOrEmpty(schemaPredicate)) { + query.append("AND ").append(schemaPredicate).append(" "); + } + + String tablePredicate = buildTablePredicate(connectorConfig); + if (!Strings.isNullOrEmpty(tablePredicate)) { + query.append("AND ").append(tablePredicate).append(" "); + } + + query.append("OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','").append(logMinerUser.toUpperCase()).append("')) "); + query.append("OR (OPERATION_CODE IN (7,36))"); + + return query.toString(); + } + + private static String buildSchemaPredicate(OracleConnectorConfig connectorConfig) { + StringBuilder predicate = new StringBuilder(); + if (Strings.isNullOrEmpty(connectorConfig.schemaIncludeList())) { + if (!Strings.isNullOrEmpty(connectorConfig.schemaExcludeList())) { + List patterns = Strings.listOfRegex(connectorConfig.schemaExcludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", true)).append(")"); + } + } + else { + List patterns = Strings.listOfRegex(connectorConfig.schemaIncludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", false)).append(")"); + } + return predicate.toString(); + } + + private static String buildTablePredicate(OracleConnectorConfig connectorConfig) { + StringBuilder predicate = new StringBuilder(); + if (Strings.isNullOrEmpty(connectorConfig.tableIncludeList())) { + if (!Strings.isNullOrEmpty(connectorConfig.tableExcludeList())) { + List patterns = Strings.listOfRegex(connectorConfig.tableExcludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", true)).append(")"); + } + } + else { + List patterns = Strings.listOfRegex(connectorConfig.tableIncludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", false)).append(")"); + } + return predicate.toString(); + } + + private static String listOfPatternsToSql(List patterns, String columnName, boolean applyNot) { + StringBuilder predicate = new StringBuilder(); + for (Iterator i = patterns.iterator(); i.hasNext();) { + Pattern pattern = i.next(); + if (applyNot) { + predicate.append("NOT "); + } + // NOTE: The REGEXP_LIKE operator was added in Oracle 10g (10.1.0.0.0) + final String text = resolveRegExpLikePattern(pattern); + predicate.append("REGEXP_LIKE(").append(columnName).append(",'").append(text).append("','i')"); + if (i.hasNext()) { + // Exclude lists imply combining them via AND, Include lists imply combining them via OR? + predicate.append(applyNot ? " AND " : " OR "); + } + } + return predicate.toString(); + } + + private static String resolveRegExpLikePattern(Pattern pattern) { + // The REGEXP_LIKE operator acts identical to LIKE in that it automatically prepends/appends "%". + // We need to resolve our matches to be explicit with "^" and "$" if they don't already exist so + // that the LIKE aspect of the match doesn't mistakenly filter "DEBEZIUM2" when using "DEBEZIUM". + String text = pattern.pattern(); + if (!text.startsWith("^")) { + text = "^" + text; + } + if (!text.endsWith("$")) { + text += "$"; + } + return text; } static String addLogFileStatement(String option, String fileName) { @@ -335,21 +412,4 @@ public static long parseRetentionFromName(String historyTableName) { Integer.parseInt(tokens[6])); // minutes return Duration.between(recorded, LocalDateTime.now()).toHours(); } - - /** - * This method builds table_name IN predicate, filtering out non whitelisted tables from Log Mining. - * It limits joining condition over 1000 tables, Oracle will throw exception in such predicate. - * @param tables white listed table names - * @return IN predicate or empty string if number of whitelisted tables exceeds 1000 - */ - private static String buildTableInPredicate(List tables) { - if (tables.size() == 0 || tables.size() > 1000) { - LOGGER.warn(" Cannot apply {} whitelisted tables condition", tables.size()); - return ""; - } - - StringJoiner tableNames = new StringJoiner(","); - tables.forEach(table -> tableNames.add("'" + table + "'")); - return " AND table_name IN (" + tableNames + ") "; - } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/DmlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/DmlParser.java index c00d950f7..ab70f7002 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/DmlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/DmlParser.java @@ -6,6 +6,7 @@ package io.debezium.connector.oracle.logminer.parser; import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry; +import io.debezium.relational.TableId; import io.debezium.relational.Tables; /** @@ -19,9 +20,10 @@ public interface DmlParser { * * @param sql the sql statement * @param tables collection of known tables. + * @param tableId the table identifier * @param txId the current transaction id the sql is part of. * @return the parsed sql as a DML entry or {@code null} if the SQL couldn't be parsed. * @throws DmlParserException thrown if a parse exception is detected. */ - LogMinerDmlEntry parse(String sql, Tables tables, String txId); + LogMinerDmlEntry parse(String sql, Tables tables, TableId tableId, String txId); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/LogMinerDmlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/LogMinerDmlParser.java index b045dbbee..4f91672fb 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/LogMinerDmlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/LogMinerDmlParser.java @@ -15,6 +15,7 @@ import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry; import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl; import io.debezium.data.Envelope; +import io.debezium.relational.TableId; import io.debezium.relational.Tables; /** @@ -70,7 +71,7 @@ public class LogMinerDmlParser implements DmlParser { private static final int WHERE_LENGTH = WHERE.length(); @Override - public LogMinerDmlEntry parse(String sql, Tables tables, String txId) { + public LogMinerDmlEntry parse(String sql, Tables tables, TableId tableId, String txId) { if (sql != null && sql.length() > 0) { switch (sql.charAt(0)) { case 'i': diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/SimpleDmlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/SimpleDmlParser.java index b7d98256e..878525411 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/SimpleDmlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/parser/SimpleDmlParser.java @@ -53,7 +53,6 @@ public class SimpleDmlParser implements DmlParser { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleDmlParser.class); protected final String catalogName; - protected final String schemaName; private final OracleValueConverters converter; private final CCJSqlParserManager pm; private final Map newColumnValues = new LinkedHashMap<>(); @@ -64,18 +63,22 @@ public class SimpleDmlParser implements DmlParser { /** * Constructor * @param catalogName database name - * @param schemaName user name * @param converter value converter */ - public SimpleDmlParser(String catalogName, String schemaName, OracleValueConverters converter) { + public SimpleDmlParser(String catalogName, OracleValueConverters converter) { this.catalogName = catalogName; - this.schemaName = schemaName; this.converter = converter; pm = new CCJSqlParserManager(); } - @Override - public LogMinerDmlEntry parse(String dmlContent, Tables tables, String txId) { + /** + * This parses a DML + * @param dmlContent DML + * @param tables debezium Tables + * @param tableId the TableId for the log miner contents view row + * @return parsed value holder class + */ + public LogMinerDmlEntry parse(String dmlContent, Tables tables, TableId tableId, String txId) { try { // If a table contains Spatial data type, DML input generates two entries in REDO LOG. @@ -99,7 +102,7 @@ public LogMinerDmlEntry parse(String dmlContent, Tables tables, String txId) { Statement st = pm.parse(new StringReader(dmlContent)); if (st instanceof Update) { - parseUpdate(tables, (Update) st); + parseUpdate(tables, tableId, (Update) st); List actualNewValues = newColumnValues.values().stream() .filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList()); List actualOldValues = oldColumnValues.values().stream() @@ -108,14 +111,14 @@ public LogMinerDmlEntry parse(String dmlContent, Tables tables, String txId) { } else if (st instanceof Insert) { - parseInsert(tables, (Insert) st); + parseInsert(tables, tableId, (Insert) st); List actualNewValues = newColumnValues.values() .stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList()); return new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList()); } else if (st instanceof Delete) { - parseDelete(tables, (Delete) st); + parseDelete(tables, tableId, (Delete) st); List actualOldValues = oldColumnValues.values() .stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList()); return new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues); @@ -130,11 +133,13 @@ else if (st instanceof Delete) { } } - private void initColumns(Tables tables, String tableName) { - table = tables.forTable(catalogName, schemaName, tableName); + private void initColumns(Tables tables, TableId tableId, String tableName) { + if (!tableId.table().equals(tableName)) { + throw new ParsingException(null, "Resolved TableId expected table name '" + tableId.table() + "' but is '" + tableName + "'"); + } + table = tables.forTable(tableId); if (table == null) { - TableId id = new TableId(catalogName, schemaName, tableName); - throw new ParsingException(null, "Trying to parse a table '" + id + "', which does not exist."); + throw new ParsingException(null, "Trying to parse a table '" + tableId + "', which does not exist."); } for (int i = 0; i < table.columns().size(); i++) { Column column = table.columns().get(i); @@ -147,13 +152,13 @@ private void initColumns(Tables tables, String tableName) { } // this parses simple statement with only one table - private void parseUpdate(Tables tables, Update st) throws JSQLParserException { + private void parseUpdate(Tables tables, TableId tableId, Update st) throws JSQLParserException { int tableCount = st.getTables().size(); if (tableCount > 1 || tableCount == 0) { throw new JSQLParserException("DML includes " + tableCount + " tables"); } net.sf.jsqlparser.schema.Table parseTable = st.getTables().get(0); - initColumns(tables, ParserUtils.stripeQuotes(parseTable.getName())); + initColumns(tables, tableId, ParserUtils.stripeQuotes(parseTable.getName())); List columns = st.getColumns(); Alias alias = parseTable.getAlias(); @@ -171,8 +176,8 @@ private void parseUpdate(Tables tables, Update st) throws JSQLParserException { } } - private void parseInsert(Tables tables, Insert st) { - initColumns(tables, ParserUtils.stripeQuotes(st.getTable().getName())); + private void parseInsert(Tables tables, TableId tableId, Insert st) { + initColumns(tables, tableId, ParserUtils.stripeQuotes(st.getTable().getName())); Alias alias = st.getTable().getAlias(); aliasName = alias == null ? "" : alias.getName().trim(); @@ -189,8 +194,8 @@ public void visit(ExpressionList expressionList) { oldColumnValues.clear(); } - private void parseDelete(Tables tables, Delete st) { - initColumns(tables, ParserUtils.stripeQuotes(st.getTable().getName())); + private void parseDelete(Tables tables, TableId tableId, Delete st) { + initColumns(tables, tableId, ParserUtils.stripeQuotes(st.getTable().getName())); Alias alias = st.getTable().getAlias(); aliasName = alias == null ? "" : alias.getName().trim(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java index 8beb0e7b1..f30a225b4 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java @@ -48,7 +48,6 @@ public void validLogminerNoUrl() throws Exception { .with(OracleConnectorConfig.SERVER_NAME, "myserver") .with(OracleConnectorConfig.HOSTNAME, "MyHostname") .with(OracleConnectorConfig.DATABASE_NAME, "mydb") - .with(OracleConnectorConfig.SCHEMA_NAME, "myschema") .with(OracleConnectorConfig.USER, "debezium") .build()); assertTrue(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error)); @@ -77,7 +76,6 @@ public void validLogminerWithUrl() throws Exception { .with(OracleConnectorConfig.SERVER_NAME, "myserver") .with(OracleConnectorConfig.URL, "MyHostname") .with(OracleConnectorConfig.DATABASE_NAME, "mydb") - .with(OracleConnectorConfig.SCHEMA_NAME, "myschema") .with(OracleConnectorConfig.USER, "debezium") .build()); assertTrue(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error)); @@ -93,7 +91,6 @@ public void validUrlTNS() throws Exception { .with(OracleConnectorConfig.URL, "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.11)(PORT=1701))(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.12)(PORT=1701))(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.13)(PORT=1701))(LOAD_BALANCE = yes)(FAILOVER = on)(CONNECT_DATA =(SERVER = DEDICATED)(SERVICE_NAME = myserver.mydomain.com)(FAILOVER_MODE =(TYPE = SELECT)(METHOD = BASIC)(RETRIES = 3)(DELAY = 5))))") .with(OracleConnectorConfig.DATABASE_NAME, "mydb") - .with(OracleConnectorConfig.SCHEMA_NAME, "myschema") .with(OracleConnectorConfig.USER, "debezium") .build()); assertTrue(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error)); @@ -107,7 +104,6 @@ public void invalidNoHostnameNoUri() throws Exception { .with(OracleConnectorConfig.CONNECTOR_ADAPTER, "logminer") .with(OracleConnectorConfig.SERVER_NAME, "myserver") .with(OracleConnectorConfig.DATABASE_NAME, "mydb") - .with(OracleConnectorConfig.SCHEMA_NAME, "myschema") .with(OracleConnectorConfig.USER, "debezium") .build()); assertFalse(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error)); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java index 415171a55..ddf34e2e8 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java @@ -27,14 +27,13 @@ import org.junit.rules.TestRule; import io.debezium.config.Configuration; +import io.debezium.config.Field; import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; -import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs; -import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.util.TestHelper; import io.debezium.data.VerifyRecord; +import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; -import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.util.Testing; /** @@ -121,174 +120,64 @@ public void after() throws SQLException { } @Test - @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming") public void shouldApplyTableWhitelistConfiguration() throws Exception { - Configuration config = TestHelper.defaultConfig() - .with( - RelationalDatabaseConnectorConfig.TABLE_WHITELIST, - "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) - .build(); - - start(OracleConnector.class, config); - assertConnectorIsRunning(); - - waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - - connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')"); - connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); - connection.execute("COMMIT"); - - String ddl = "CREATE TABLE debezium.table3 (" + - " id NUMERIC(9, 0) NOT NULL, " + - " name VARCHAR2(1000), " + - " PRIMARY KEY (id)" + - ")"; - - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.table3 TO c##xstrm"); - - connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); - connection.execute("COMMIT"); - - SourceRecords records = consumeRecordsByTopic(2); - - List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); - assertThat(testTableRecords).hasSize(1); - - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); - Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(1); - assertThat(after.get("NAME")).isEqualTo("Text-1"); - - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); - assertThat(testTableRecords).isNull(); - - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); - assertThat(testTableRecords).hasSize(1); - - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); - after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(3); - assertThat(after.get("NAME")).isEqualTo("Text-3"); + shouldApplyTableInclusionConfiguration(true); } @Test - @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming") public void shouldApplyTableIncludeListConfiguration() throws Exception { - Configuration config = TestHelper.defaultConfig() - .with( - OracleConnectorConfig.TABLE_INCLUDE_LIST, - "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) - .build(); - - start(OracleConnector.class, config); - assertConnectorIsRunning(); - - waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - - connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')"); - connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); - connection.execute("COMMIT"); - - String ddl = "CREATE TABLE debezium.table3 (" + - " id NUMERIC(9, 0) NOT NULL, " + - " name VARCHAR2(1000), " + - " PRIMARY KEY (id)" + - ")"; - - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); - connection.execute("ALTER TABLE debezium.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); - connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); - connection.execute("COMMIT"); - - SourceRecords records = consumeRecordsByTopic(2); - - List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); - assertThat(testTableRecords).hasSize(1); - - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); - Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(1); - assertThat(after.get("NAME")).isEqualTo("Text-1"); - - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); - assertThat(testTableRecords).isNull(); - - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); - assertThat(testTableRecords).hasSize(1); - - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); - after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(3); - assertThat(after.get("NAME")).isEqualTo("Text-3"); + shouldApplyTableInclusionConfiguration(false); } @Test - @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming") public void shouldApplyTableBlacklistConfiguration() throws Exception { - Configuration config = TestHelper.defaultConfig() - .with( - OracleConnectorConfig.TABLE_BLACKLIST, - "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*") - .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) - .build(); - - start(OracleConnector.class, config); - assertConnectorIsRunning(); - - waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - - connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')"); - connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); - connection.execute("COMMIT"); - - String ddl = "CREATE TABLE debezium.table3 (" + - " id NUMERIC(9,0) NOT NULL, " + - " name VARCHAR2(1000), " + - " PRIMARY KEY (id)" + - ")"; - - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); - - connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); - connection.execute("COMMIT"); - - SourceRecords records = consumeRecordsByTopic(2); - - List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); - assertThat(testTableRecords).hasSize(1); - - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); - Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(1); - assertThat(after.get("NAME")).isEqualTo("Text-1"); - - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); - assertThat(testTableRecords).isNull(); - - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); - assertThat(testTableRecords).hasSize(1); - - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); - after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(3); - assertThat(after.get("NAME")).isEqualTo("Text-3"); + shouldApplyTableExclusionsConfiguration(true); } @Test - @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming") public void shouldApplyTableExcludeListConfiguration() throws Exception { + shouldApplyTableExclusionsConfiguration(false); + } + + @Test + @FixFor("DBZ-3009") + public void shouldApplySchemaAndTableWhitelistConfiguration() throws Exception { + shouldApplySchemaAndTableInclusionConfiguration(true); + } + + @Test + @FixFor("DBZ-3009") + public void shouldApplySchemaAndTableIncludeListConfiguration() throws Exception { + shouldApplySchemaAndTableInclusionConfiguration(false); + } + + @Test + @FixFor("DBZ-3009") + public void shouldApplySchemaAndTableBlacklistConfiguration() throws Exception { + shouldApplySchemaAndTableExclusionsConfiguration(true); + } + + @Test + @FixFor("DBZ-3009") + public void shouldApplySchemaAndTableExcludeListConfiguration() throws Exception { + shouldApplySchemaAndTableExclusionsConfiguration(false); + } + + private void shouldApplyTableInclusionConfiguration(boolean useLegacyOption) throws Exception { + Field option = OracleConnectorConfig.TABLE_INCLUDE_LIST; + if (useLegacyOption) { + option = OracleConnectorConfig.TABLE_WHITELIST; + } + + boolean includeDdlChanges = true; + if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { + // LogMiner currently does not support DDL changes during streaming phase + includeDdlChanges = false; + } + Configuration config = TestHelper.defaultConfig() - .with( - OracleConnectorConfig.TABLE_EXCLUDE_LIST, - "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*") + .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM") + .with(option, "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3") .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) .build(); @@ -302,19 +191,21 @@ public void shouldApplyTableExcludeListConfiguration() throws Exception { connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); connection.execute("COMMIT"); - String ddl = "CREATE TABLE debezium.table3 (" + - " id NUMERIC(9,0) NOT NULL, " + - " name VARCHAR2(1000), " + - " PRIMARY KEY (id)" + - ")"; + if (includeDdlChanges) { + String ddl = "CREATE TABLE debezium.table3 (" + + " id NUMERIC(9, 0) NOT NULL, " + + " name VARCHAR2(1000), " + + " PRIMARY KEY (id)" + + ")"; - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); + connection.execute("COMMIT"); + } - connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); - connection.execute("COMMIT"); - - SourceRecords records = consumeRecordsByTopic(2); + SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 2 : 1); List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); assertThat(testTableRecords).hasSize(1); @@ -327,17 +218,229 @@ public void shouldApplyTableExcludeListConfiguration() throws Exception { testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); assertThat(testTableRecords).isNull(); - testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); + if (includeDdlChanges) { + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); + after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(3); + assertThat(after.get("NAME")).isEqualTo("Text-3"); + } + } + + private void shouldApplySchemaAndTableInclusionConfiguration(boolean useLegacyOption) throws Exception { + Field option = OracleConnectorConfig.TABLE_INCLUDE_LIST; + if (useLegacyOption) { + option = OracleConnectorConfig.TABLE_WHITELIST; + } + + boolean includeDdlChanges = true; + if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { + // LogMiner currently does not support DDL changes during streaming phase + includeDdlChanges = false; + } + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM,DEBEZIUM2") + .with(option, "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3") + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')"); + connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); + connection.execute("INSERT INTO debezium2.table2 VALUES (1, 'Text2-1')"); + connection.execute("COMMIT"); + + if (includeDdlChanges) { + String ddl = "CREATE TABLE debezium.table3 (" + + " id NUMERIC(9, 0) NOT NULL, " + + " name VARCHAR2(1000), " + + " PRIMARY KEY (id)" + + ")"; + + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); + connection.execute("COMMIT"); + } + + SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 3 : 2); + + List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); assertThat(testTableRecords).hasSize(1); - VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); + Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("NAME")).isEqualTo("Text-1"); + + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); + assertThat(testTableRecords).isNull(); + + testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); - assertThat(after.get("ID")).isEqualTo(3); - assertThat(after.get("NAME")).isEqualTo("Text-3"); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("NAME")).isEqualTo("Text2-1"); + + if (includeDdlChanges) { + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); + after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(3); + assertThat(after.get("NAME")).isEqualTo("Text-3"); + } + } + + private void shouldApplyTableExclusionsConfiguration(boolean useLegacyOption) throws Exception { + Field option = OracleConnectorConfig.TABLE_EXCLUDE_LIST; + if (useLegacyOption) { + option = OracleConnectorConfig.TABLE_BLACKLIST; + } + + boolean includeDdlChanges = true; + if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { + // LogMiner currently does not support DDL changes during streaming phase + includeDdlChanges = false; + } + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM") + .with(option, "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*") + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')"); + connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); + connection.execute("COMMIT"); + + if (includeDdlChanges) { + String ddl = "CREATE TABLE debezium.table3 (" + + " id NUMERIC(9,0) NOT NULL, " + + " name VARCHAR2(1000), " + + " PRIMARY KEY (id)" + + ")"; + + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); + + connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); + connection.execute("COMMIT"); + } + + SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 2 : 1); + + List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); + Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("NAME")).isEqualTo("Text-1"); + + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); + assertThat(testTableRecords).isNull(); + + if (includeDdlChanges) { + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); + after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(3); + assertThat(after.get("NAME")).isEqualTo("Text-3"); + } + } + + private void shouldApplySchemaAndTableExclusionsConfiguration(boolean useLegacyOption) throws Exception { + Field option = OracleConnectorConfig.TABLE_EXCLUDE_LIST; + if (useLegacyOption) { + option = OracleConnectorConfig.TABLE_BLACKLIST; + } + + boolean includeDdlChanges = true; + if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { + // LogMiner currently does not support DDL changes during streaming phase + includeDdlChanges = false; + } + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.SCHEMA_EXCLUDE_LIST, "DEBEZIUM,SYS") + .with(option, "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*") + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')"); + connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')"); + connection.execute("INSERT INTO debezium2.table2 VALUES (1, 'Text2-1')"); + connection.execute("COMMIT"); + + if (includeDdlChanges) { + String ddl = "CREATE TABLE debezium.table3 (" + + " id NUMERIC(9,0) NOT NULL, " + + " name VARCHAR2(1000), " + + " PRIMARY KEY (id)" + + ")"; + + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName()); + + connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')"); + connection.execute("COMMIT"); + } + + SourceRecords records = consumeRecordsByTopic(1); + + List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); + assertThat(testTableRecords).isNull(); + + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2"); + assertThat(testTableRecords).isNull(); + + testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); + Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("NAME")).isEqualTo("Text2-1"); + + if (includeDdlChanges) { + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); + assertThat(testTableRecords).hasSize(1); + + VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3); + after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); + assertThat(after.get("ID")).isEqualTo(3); + assertThat(after.get("NAME")).isEqualTo("Text-3"); + } } @Test - @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER) public void shouldTakeTimeDifference() throws Exception { Testing.Print.enable(); String stmt = "select current_timestamp from dual"; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 292a647b9..1d8521d48 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -255,19 +255,6 @@ public void shouldContinueWithStreamingAfterSnapshot() throws Exception { continueStreamingAfterSnapshot(config); } - @Test - @FixFor("DBZ-2607") - @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "Creates a backward compatibility regression") - public void shouldNotRequireDatabaseSchemaConfiguration() throws Exception { - final Map configMap = TestHelper.defaultConfig() - .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") - .build() - .asMap(); - configMap.remove(OracleConnectorConfig.SCHEMA_NAME.name()); - - continueStreamingAfterSnapshot(Configuration.from(configMap)); - } - private void continueStreamingAfterSnapshot(Configuration config) throws Exception { int expectedRecordCount = 0; connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))"); @@ -658,7 +645,7 @@ public void deleteWithoutTombstone() throws Exception { } @Test - @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "Seems to get caught in loop?") + @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not yet support DDL during streaming") public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception { TestHelper.dropTable(connection, "debezium.customer2"); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerDmlParserTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerDmlParserTest.java index 5ee538925..fb4ecd8a9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerDmlParserTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerDmlParserTest.java @@ -42,7 +42,7 @@ public void testParsingInsert() throws Exception { "('1','Acme',TO_TIMESTAMP('2020-02-01 00:00:00.'),Unsupported Type," + "TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'),Unsupported Type,NULL);"; - LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null); + LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null); assertThat(entry.getCommandType()).isEqualTo(Operation.CREATE); assertThat(entry.getOldValues()).isEmpty(); assertThat(entry.getNewValues()).hasSize(7); @@ -72,7 +72,7 @@ public void testParsingUpdate() throws Exception { "\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') and " + "\"UT2\" = Unsupported Type and \"C1\" = NULL;"; - LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null); + LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null); assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE); assertThat(entry.getOldValues()).hasSize(7); assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); @@ -112,7 +112,7 @@ public void testParsingDelete() throws Exception { "where \"ID\" = '1' and \"NAME\" = 'Acme' and \"TS\" = TO_TIMESTAMP('2020-02-01 00:00:00.') and " + "\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS');"; - LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null); + LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null); assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE); assertThat(entry.getOldValues()).hasSize(5); assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID"); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/OracleDmlParserTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/OracleDmlParserTest.java index 47fa97d3d..3b2d687a2 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/OracleDmlParserTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/OracleDmlParserTest.java @@ -38,6 +38,7 @@ import io.debezium.connector.oracle.util.TestHelper; import io.debezium.data.Envelope; import io.debezium.doc.FixFor; +import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.util.IoUtil; @@ -57,6 +58,7 @@ public class OracleDmlParserTest { private static final String CATALOG_NAME = "ORCLPDB1"; private static final String SCHEMA_NAME = "DEBEZIUM"; private static final String FULL_TABLE_NAME = SCHEMA_NAME + "\".\"" + TABLE_NAME; + private static final TableId TABLE_ID = TableId.parse(CATALOG_NAME + "." + SCHEMA_NAME + "." + TABLE_NAME); private static final String SPATIAL_DATA = "SDO_GEOMETRY(2003, NULL, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY" + "(102604.878, 85772.8286, 101994.879, 85773.6633, 101992.739, 84209.6648, 102602.738, 84208.83, 102604.878, 85772.8286))"; private static final String SPATIAL_DATA_1 = "'unsupported type'"; @@ -72,7 +74,7 @@ public void setUp() { ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME); antlrDmlParser = new OracleDmlParser(true, CATALOG_NAME, SCHEMA_NAME, converters); - sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters); + sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters); tables = new Tables(); CLOB_DATA = StringUtils.repeat("clob_", 4000); @@ -98,7 +100,7 @@ public void shouldParseAliasUpdate() throws Exception { LogMinerDmlEntry record = antlrDmlParser.getDmlEntry(); verifyUpdate(record, false, true, 9); - record = sqlDmlParser.parse(dml, tables, "1"); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); verifyUpdate(record, false, true, 9); } @@ -136,7 +138,7 @@ public void shouldParseDateFormats() throws Exception { private void parseDate(String format, boolean validateDate) { String dml = "update \"" + FULL_TABLE_NAME + "\" a set a.\"col7\" = " + format + ", a.\"col13\" = " + format + " where a.ID = 1;"; - LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, "1"); + LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); assertThat(record).isNotNull(); assertThat(record.getNewValues()).isNotEmpty(); assertThat(record.getOldValues()).isNotEmpty(); @@ -154,7 +156,7 @@ private void parseTimestamp(String format, boolean validateTimestamp) { "and a.COL3 = 'text' and a.COL4 IS NULL and a.\"COL5\" IS NULL and a.COL6 IS NULL " + "and a.COL8 = " + format + " and a.col11 is null;"; - LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, "1"); + LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); assertThat(record.getNewValues()).isNotEmpty(); assertThat(record.getOldValues()).isNotEmpty(); @@ -174,7 +176,7 @@ public void shouldParseAliasInsert() throws Exception { LogMinerDmlEntry record = antlrDmlParser.getDmlEntry(); verifyInsert(record); - record = sqlDmlParser.parse(dml, tables, "1"); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); verifyInsert(record); } @@ -190,7 +192,7 @@ public void shouldParseAliasDelete() throws Exception { LogMinerDmlEntry record = antlrDmlParser.getDmlEntry(); verifyDelete(record, true); - record = sqlDmlParser.parse(dml, tables, "1"); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); verifyDelete(record, true); } @@ -209,7 +211,7 @@ public void shouldParseNoWhereClause() throws Exception { LogMinerDmlEntry record = antlrDmlParser.getDmlEntry(); verifyUpdate(record, false, false, 9); - record = sqlDmlParser.parse(dml, tables, "1"); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); verifyUpdate(record, false, false, 9); dml = "delete from \"" + FULL_TABLE_NAME + "\" a "; @@ -217,7 +219,7 @@ record = sqlDmlParser.parse(dml, tables, "1"); record = antlrDmlParser.getDmlEntry(); verifyDelete(record, false); - record = sqlDmlParser.parse(dml, tables, "1"); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); verifyDelete(record, false); } @@ -233,7 +235,7 @@ public void shouldParseInsertAndDeleteTable() throws Exception { LogMinerDmlEntry record = antlrDmlParser.getDmlEntry(); verifyInsert(record); - record = sqlDmlParser.parse(dml, tables, "1"); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); verifyInsert(record); dml = "delete from \"" + FULL_TABLE_NAME + @@ -243,7 +245,7 @@ record = sqlDmlParser.parse(dml, tables, "1"); record = antlrDmlParser.getDmlEntry(); verifyDelete(record, true); - record = sqlDmlParser.parse(dml, tables, ""); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); verifyDelete(record, true); } @@ -264,7 +266,7 @@ public void shouldParseUpdateTable() throws Exception { LogMinerDmlEntry record = antlrDmlParser.getDmlEntry(); // verifyUpdate(record, true, true); - record = sqlDmlParser.parse(dml, tables, ""); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); verifyUpdate(record, true, true, 11); dml = "update \"" + FULL_TABLE_NAME @@ -274,7 +276,7 @@ record = sqlDmlParser.parse(dml, tables, ""); "and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL " + "and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";"; - record = sqlDmlParser.parse(dml, tables, ""); + record = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); } @Test @@ -289,7 +291,7 @@ public void shouldParseUpdateNoChangesTable() throws Exception { + "and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";"; - LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, ""); + LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); boolean pass = record.getCommandType().equals(Envelope.Operation.UPDATE) && record.getOldValues().size() == record.getNewValues().size() && record.getNewValues().containsAll(record.getOldValues()); @@ -308,7 +310,7 @@ public void shouldParseSpecialCharacters() throws Exception { antlrDmlParser.parse(dml, tables); assertThat(antlrDmlParser.getDmlEntry()).isNotNull(); - LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, "1"); + LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); assertThat(result).isNotNull(); LogMinerColumnValue value = result.getNewValues().get(2); assertThat(value.getColumnData().toString()).contains("\\"); @@ -319,7 +321,7 @@ public void shouldParseSpecialCharacters() throws Exception { antlrDmlParser.parse(dml, tables); assertThat(antlrDmlParser.getDmlEntry()).isNotNull(); - result = sqlDmlParser.parse(dml, tables, ""); + result = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); assertThat(result).isNotNull(); value = result.getOldValues().get(3); assertThat(value.getColumnData().toString()).contains("\\"); @@ -330,41 +332,42 @@ public void shouldParseStrangeDml() throws Exception { String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null)); ddlParser.parse(createStatement, tables); String dml = null; - LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, ""); + LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); assertThat(result).isNull(); dml = "select * from test;null;"; - assertDmlParserException(dml, sqlDmlParser, tables, ""); + assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, ""); + assertThat(result).isNull(); dml = "full dummy mess"; - assertDmlParserException(dml, sqlDmlParser, tables, ""); + assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, ""); dml = "delete from non_exiting_table " + " where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " + " and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null"; - assertDmlParserException(dml, sqlDmlParser, tables, ""); + assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, ""); Update update = mock(Update.class); Mockito.when(update.getTables()).thenReturn(new ArrayList<>()); dml = "update \"" + FULL_TABLE_NAME + "\" set col1 = 3 " + " where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " + " and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null and col20 is null"; - result = sqlDmlParser.parse(dml, tables, ""); + result = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); assertThat(result.getOldValues().size()).isEqualTo(12); assertThat(result.getOldValues().size() == 12).isTrue(); dml = "update \"" + FULL_TABLE_NAME + "\" set col1 = 3 " + " where id = 6 and col1 = 2 and col2 = 'te\\xt' and col30 = 'tExt\\' and col4 is null and col5 is null " + " and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col21 is null"; - result = sqlDmlParser.parse(dml, tables, ""); + result = sqlDmlParser.parse(dml, tables, TABLE_ID, ""); assertThat(result.getNewValues().size()).isEqualTo(14); dml = "update table1, \"" + FULL_TABLE_NAME + "\" set col1 = 3 " + " where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " + " and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null and col20 is null"; - assertDmlParserException(dml, sqlDmlParser, tables, ""); + assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, ""); } - private void assertDmlParserException(String sql, DmlParser parser, Tables tables, String txId) { + private void assertDmlParserException(String sql, DmlParser parser, Tables tables, TableId tableId, String txId) { try { - LogMinerDmlEntry dml = parser.parse(sql, tables, txId); + LogMinerDmlEntry dml = parser.parse(sql, tables, tableId, txId); } catch (Exception e) { assertThat(e).isInstanceOf(DmlParserException.class); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/RowMapperTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/RowMapperTest.java index c25ba1898..272fc9bee 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/RowMapperTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/RowMapperTest.java @@ -24,6 +24,7 @@ import org.junit.rules.TestRule; import org.mockito.Mockito; +import io.debezium.DebeziumException; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName; @@ -171,7 +172,7 @@ public void testGetTableId() throws SQLException { tableId = RowMapper.getTableId("catalog", rs); fail("RowMapper should not have returned a TableId"); } - catch (SQLException e) { + catch (DebeziumException e) { assertThat(tableId).isNull(); } } @@ -191,7 +192,7 @@ public void testGetTableIdWithVariedCase() throws SQLException { tableId = RowMapper.getTableId("catalog", rs); fail("RowMapper should not have returned a TableId"); } - catch (SQLException e) { + catch (DebeziumException e) { assertThat(tableId).isNull(); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java index 4df34568a..2a5544e64 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java @@ -12,8 +12,6 @@ import java.sql.SQLRecoverableException; import java.time.Duration; import java.time.LocalDateTime; -import java.util.HashSet; -import java.util.Set; import org.junit.Rule; import org.junit.Test; @@ -21,10 +19,10 @@ import org.mockito.Mockito; import io.debezium.connector.oracle.OracleConnectorConfig; -import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName; +import io.debezium.doc.FixFor; import io.debezium.relational.TableId; @SkipWhenAdapterNameIsNot(value = AdapterName.LOGMINER) @@ -33,6 +31,143 @@ public class SqlUtilsTest { @Rule public TestRule skipRule = new SkipTestDependingOnAdapterNameRule(); + private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " + + + "FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE IN (1,2,3,5) AND SCN >= ? AND SCN < ? " + + "AND TABLE_NAME != '" + SqlUtils.LOGMNR_FLUSH_TABLE + "' " + + "${schemaPredicate}" + + "${tablePredicate}" + + "OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','${user}')) " + + "OR (OPERATION_CODE IN (7,36))"; + + private static final String USERNAME = "USERNAME"; + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithNoFilters() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn(null); + Mockito.when(config.schemaExcludeList()).thenReturn(null); + Mockito.when(config.tableIncludeList()).thenReturn(null); + Mockito.when(config.tableExcludeList()).thenReturn(null); + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(null, null)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithSchemaInclude() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn("SCHEMA1,SCHEMA2"); + Mockito.when(config.schemaExcludeList()).thenReturn(null); + Mockito.when(config.tableIncludeList()).thenReturn(null); + Mockito.when(config.tableExcludeList()).thenReturn(null); + + String schema = "AND (REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') OR REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, null)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithSchemaExclude() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn(null); + Mockito.when(config.schemaExcludeList()).thenReturn("SCHEMA1,SCHEMA2"); + Mockito.when(config.tableIncludeList()).thenReturn(null); + Mockito.when(config.tableExcludeList()).thenReturn(null); + + String schema = "AND (NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') AND NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, null)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithTableInclude() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn(null); + Mockito.when(config.schemaExcludeList()).thenReturn(null); + Mockito.when(config.tableIncludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB"); + Mockito.when(config.tableExcludeList()).thenReturn(null); + + String table = "AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " + + "OR REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(null, table)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithTableExcludes() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn(null); + Mockito.when(config.schemaExcludeList()).thenReturn(null); + Mockito.when(config.tableIncludeList()).thenReturn(null); + Mockito.when(config.tableExcludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB"); + + String table = "AND (NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " + + "AND NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(null, table)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithSchemaTableIncludes() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn("SCHEMA1,SCHEMA2"); + Mockito.when(config.schemaExcludeList()).thenReturn(null); + Mockito.when(config.tableIncludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB"); + Mockito.when(config.tableExcludeList()).thenReturn(null); + + String schema = "AND (REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') OR REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) "; + String table = "AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " + + "OR REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, table)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithSchemaTableExcludes() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn(null); + Mockito.when(config.schemaExcludeList()).thenReturn("SCHEMA1,SCHEMA2"); + Mockito.when(config.tableIncludeList()).thenReturn(null); + Mockito.when(config.tableExcludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB"); + + String schema = "AND (NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') AND NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) "; + String table = "AND (NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " + + "AND NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, table)); + } + + @Test + @FixFor("DBZ-3009") + public void testLogMinerQueryWithSchemaExcludeTableInclude() { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Mockito.when(config.schemaIncludeList()).thenReturn(null); + Mockito.when(config.schemaExcludeList()).thenReturn("SCHEMA1,SCHEMA2"); + Mockito.when(config.tableIncludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB"); + Mockito.when(config.tableExcludeList()).thenReturn(null); + + String schema = "AND (NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') AND NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) "; + String table = "AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " + + "OR REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) "; + + String result = SqlUtils.logMinerContentsQuery(config, USERNAME); + assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, table)); + } + @Test public void testStatements() { SqlUtils.setRac(false); @@ -41,28 +176,6 @@ public void testStatements() { String expected = "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => 'FILENAME', OPTIONS => ADD);END;"; assertThat(expected.equals(result)).isTrue(); - OracleDatabaseSchema schema = mock(OracleDatabaseSchema.class); - TableId table1 = new TableId("catalog", "schema", "table1"); - TableId table2 = new TableId("catalog", "schema", "table2"); - Set tables = new HashSet<>(); - Mockito.when(schema.tableIds()).thenReturn(tables); - result = SqlUtils.logMinerContentsQuery("DATABASE", "SCHEMA", schema); - expected = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " + - "FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE in (1,2,3,5) AND SEG_OWNER = 'DATABASE' AND SCN >= ? AND SCN < ? " + - "OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','SCHEMA')) " + - " OR (OPERATION_CODE IN (7,36))"; - assertThat(result).isEqualTo(expected); - - tables.add(table1); - tables.add(table2); - result = SqlUtils.logMinerContentsQuery("DATABASE", "SCHEMA", schema); - expected = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " + - "FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE in (1,2,3,5) " + - "AND SEG_OWNER = 'DATABASE' AND table_name IN ('table1','table2') " + - "AND SCN >= ? AND SCN < ? OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','SCHEMA')) " + - " OR (OPERATION_CODE IN (7,36))"; - assertThat(result).isEqualTo(expected); - result = SqlUtils.databaseSupplementalLoggingMinCheckQuery(); expected = "SELECT 'KEY', SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE"; assertThat(result).isEqualTo(expected); @@ -201,4 +314,12 @@ public void shouldDetectConnectionProblems() { assertThat(SqlUtils.connectionProblem(new Exception("12543 problem"))).isFalse(); } + + private String resolveLogMineryContentQueryFromTemplate(String schemaReplacement, String tableReplacement) { + String query = LOG_MINER_CONTENT_QUERY_TEMPLATE; + query = query.replace("${schemaPredicate}", schemaReplacement == null ? "" : schemaReplacement); + query = query.replace("${tablePredicate}", tableReplacement == null ? "" : tableReplacement); + query = query.replace("${user}", USERNAME); + return query; + } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/ValueHolderTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/ValueHolderTest.java index c2beb4cff..c8c806355 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/ValueHolderTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/ValueHolderTest.java @@ -33,6 +33,7 @@ import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl; import io.debezium.connector.oracle.util.TestHelper; import io.debezium.data.Envelope; +import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.util.IoUtil; @@ -45,6 +46,7 @@ public class ValueHolderTest { private SimpleDmlParser sqlDmlParser; private Tables tables; private static final String FULL_TABLE_NAME = SCHEMA_NAME + "\".\"" + TABLE_NAME; + private static final TableId TABLE_ID = TableId.parse(CATALOG_NAME + "." + SCHEMA_NAME + "." + TABLE_NAME); @Rule public TestRule skipRule = new SkipTestDependingOnAdapterNameRule(); @@ -53,7 +55,7 @@ public class ValueHolderTest { public void setUp() { OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null); ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME); - sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters); + sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters); tables = new Tables(); } @@ -81,7 +83,7 @@ public void testValueHolders() throws Exception { ddlParser.parse(createStatement, tables); String dml = "insert into \"" + FULL_TABLE_NAME + "\" (\"column1\",\"column2\") values ('5','Text');"; - LogMinerDmlEntry dmlEntryParsed = sqlDmlParser.parse(dml, tables, "1"); + LogMinerDmlEntry dmlEntryParsed = sqlDmlParser.parse(dml, tables, TABLE_ID, "1"); assertThat(dmlEntryParsed.equals(dmlEntryExpected)).isTrue(); assertThat(dmlEntryExpected.getCommandType() == Envelope.Operation.CREATE).isTrue(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 1c0e1680b..743df9365 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -97,7 +97,6 @@ public static Configuration.Builder defaultConfig() { return builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME) .with(OracleConnectorConfig.PDB_NAME, "ORCLPDB1") .with(OracleConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) - .with(OracleConnectorConfig.SCHEMA_NAME, SCHEMA_USER) .with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH) .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false); }