DBZ-3009 Support multiple schemas with Oracle LogMiner

This commit is contained in:
Chris Cranford 2021-02-19 02:51:32 -05:00 committed by Gunnar Morling
parent 85cfdd2236
commit 461b784974
18 changed files with 621 additions and 358 deletions

View File

@ -76,14 +76,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription("Name of the pluggable database when working with a multi-tenant set-up. "
+ "The CDB name must be given via " + DATABASE_NAME.name() + " in this case.");
public static final Field SCHEMA_NAME = Field.create(DATABASE_CONFIG_PREFIX + "schema")
.withDisplayName("Schema name")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(OracleConnectorConfig::validateDatabaseSchema)
.withDescription("Name of the connection user to the database ");
public static final Field XSTREAM_SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "out.server.name")
.withDisplayName("XStream out server name")
.withType(Type.STRING)
@ -314,7 +306,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
Heartbeat.HEARTBEAT_TOPICS_PREFIX,
TABLENAME_CASE_INSENSITIVE,
ORACLE_VERSION,
SCHEMA_NAME,
CONNECTOR_ADAPTER,
LOG_MINING_STRATEGY,
SNAPSHOT_ENHANCEMENT_TOKEN,
@ -341,7 +332,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
private final boolean tablenameCaseInsensitive;
private final OracleVersion oracleVersion;
private final String schemaName;
private final Tables.ColumnNameFilter columnFilter;
private final HistoryRecorder logMiningHistoryRecorder;
private final Configuration jdbcConfig;
@ -374,7 +364,6 @@ public OracleConnectorConfig(Configuration config) {
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.tablenameCaseInsensitive = config.getBoolean(TABLENAME_CASE_INSENSITIVE);
this.oracleVersion = OracleVersion.parse(config.getString(ORACLE_VERSION));
this.schemaName = toUpperCase(config.getString(SCHEMA_NAME));
String blacklistedColumns = toUpperCase(config.getString(RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST));
this.columnFilter = getColumnNameFilter(blacklistedColumns);
this.logMiningHistoryRecorder = resolveLogMiningHistoryRecorder(config);
@ -429,7 +418,7 @@ public static ConfigDef configDef() {
Field.group(config, "Oracle", HOSTNAME, PORT, RelationalDatabaseConnectorConfig.USER,
RelationalDatabaseConnectorConfig.PASSWORD, SERVER_NAME, RelationalDatabaseConnectorConfig.DATABASE_NAME, PDB_NAME,
XSTREAM_SERVER_NAME, SNAPSHOT_MODE, CONNECTOR_ADAPTER, LOG_MINING_STRATEGY, URL, TABLENAME_CASE_INSENSITIVE, ORACLE_VERSION, SCHEMA_NAME);
XSTREAM_SERVER_NAME, SNAPSHOT_MODE, CONNECTOR_ADAPTER, LOG_MINING_STRATEGY, URL, TABLENAME_CASE_INSENSITIVE, ORACLE_VERSION);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY);
@ -461,6 +450,10 @@ public String getPdbName() {
return pdbName;
}
public String getCatalogName() {
return pdbName != null ? pdbName : databaseName;
}
public String getXoutServerName() {
return xoutServerName;
}
@ -477,10 +470,6 @@ public OracleVersion getOracleVersion() {
return oracleVersion;
}
public String getSchemaName() {
return schemaName;
}
public Tables.ColumnNameFilter getColumnFilter() {
return columnFilter;
}
@ -943,19 +932,6 @@ public String getConnectorName() {
return Module.name();
}
public static int validateDatabaseSchema(Configuration config, Field field, ValidationOutput problems) {
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
final String schemaName = config.getString(SCHEMA_NAME);
if (schemaName == null || schemaName.trim().length() == 0) {
problems.accept(SCHEMA_NAME, schemaName, "The '" + SCHEMA_NAME.name() + "' be provided when using the LogMiner connection adapter");
return 1;
}
}
// Everything checks out ok.
return 0;
}
public static int validateOutServerName(Configuration config, Field field, ValidationOutput problems) {
if (ConnectorAdapter.XSTREAM.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
return Field.isRequired(config, field, problems);

View File

@ -74,13 +74,12 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep
jdbcConnection.setSessionToPdb(connectorConfig.getPdbName());
}
return new OracleSnapshotContext(
connectorConfig.getPdbName() != null ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName());
return new OracleSnapshotContext(connectorConfig.getCatalogName());
}
@Override
protected Set<TableId> getAllTableIds(RelationalSnapshotContext ctx) throws Exception {
return jdbcConnection.getAllTableIds(ctx.catalogName, connectorConfig.getSchemaName(), false);
return jdbcConnection.getAllTableIds(ctx.catalogName, null, false);
// this very slow approach(commented out), it took 30 minutes on an instance with 600 tables
// return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"} );
}

View File

@ -49,7 +49,7 @@ class LogMinerQueryResultProcessor {
private final OracleDatabaseSchema schema;
private final EventDispatcher<TableId> dispatcher;
private final TransactionalBufferMetrics transactionalBufferMetrics;
private final String catalogName;
private final OracleConnectorConfig connectorConfig;
private final Clock clock;
private final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class);
private long currentOffsetScn = 0;
@ -62,7 +62,7 @@ class LogMinerQueryResultProcessor {
TransactionalBuffer transactionalBuffer,
OracleOffsetContext offsetContext, OracleDatabaseSchema schema,
EventDispatcher<TableId> dispatcher,
String catalogName, Clock clock, HistoryRecorder historyRecorder) {
Clock clock, HistoryRecorder historyRecorder) {
this.context = context;
this.metrics = metrics;
this.transactionalBuffer = transactionalBuffer;
@ -70,16 +70,16 @@ class LogMinerQueryResultProcessor {
this.schema = schema;
this.dispatcher = dispatcher;
this.transactionalBufferMetrics = transactionalBuffer.getMetrics();
this.catalogName = catalogName;
this.clock = clock;
this.historyRecorder = historyRecorder;
this.dmlParser = resolveParser(connectorConfig, catalogName, jdbcConnection);
this.connectorConfig = connectorConfig;
this.dmlParser = resolveParser(connectorConfig, jdbcConnection);
}
private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, String catalogName, OracleConnection connection) {
private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, OracleConnection connection) {
if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) {
OracleValueConverters converter = new OracleValueConverters(connectorConfig, connection);
return new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converter);
return new SimpleDmlParser(connectorConfig.getCatalogName(), converter);
}
return new LogMinerDmlParser();
}
@ -175,6 +175,11 @@ int processResult(ResultSet resultSet) {
// DML
if (operationCode == RowMapper.INSERT || operationCode == RowMapper.DELETE || operationCode == RowMapper.UPDATE) {
final TableId tableId = RowMapper.getTableId(connectorConfig.getCatalogName(), resultSet);
if (!connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
continue;
}
LOGGER.trace("DML, {}, sql {}", logMessage, redoSql);
dmlCounter++;
switch (operationCode) {
@ -190,7 +195,7 @@ int processResult(ResultSet resultSet) {
}
Instant parseStart = Instant.now();
LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), txId);
LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), tableId, txId);
metrics.addCurrentParseTime(Duration.between(parseStart, Instant.now()));
if (dmlEntry == null || redoSql == null) {
@ -214,7 +219,6 @@ int processResult(ResultSet resultSet) {
dmlEntry.setScn(scn);
try {
TableId tableId = RowMapper.getTableId(catalogName, resultSet);
transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), (timestamp, smallestScn, commitScn, counter) -> {
// update SCN in offset context only if processed SCN less than SCN among other transactions
if (smallestScn == null || scn.compareTo(smallestScn) < 0) {

View File

@ -64,7 +64,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final Clock clock;
private final OracleDatabaseSchema schema;
private final OracleOffsetContext offsetContext;
private final String catalogName;
private final boolean isRac;
private final Set<String> racHosts = new HashSet<>();
private final JdbcConfiguration jdbcConfiguration;
@ -89,7 +88,6 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.schema = schema;
this.offsetContext = offsetContext;
this.connectorConfig = connectorConfig;
this.catalogName = (connectorConfig.getPdbName() != null) ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName();
this.strategy = connectorConfig.getLogMiningStrategy();
this.isContinuousMining = connectorConfig.isContinuousMining();
this.errorHandler = errorHandler;
@ -142,9 +140,9 @@ public void execute(ChangeEventSourceContext context) {
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, jdbcConnection,
connectorConfig, logMinerMetrics, transactionalBuffer, offsetContext, schema, dispatcher,
catalogName, clock, historyRecorder);
clock, historyRecorder);
final String query = SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema);
final String query = SqlUtils.logMinerContentsQuery(connectorConfig, jdbcConnection.username());
try (PreparedStatement miningView = jdbcConnection.connection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics);

View File

@ -13,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
@ -191,8 +192,13 @@ private static void logError(TransactionalBufferMetrics metrics, SQLException e,
LogMinerHelper.logError(metrics, "Cannot get {}. This entry from LogMiner will be lost due to the {}", s, e);
}
public static TableId getTableId(String catalogName, ResultSet rs) throws SQLException {
return new TableId(catalogName, rs.getString(SEG_OWNER), rs.getString(TABLE_NAME));
public static TableId getTableId(String catalogName, ResultSet rs) {
try {
return new TableId(catalogName, rs.getString(SEG_OWNER), rs.getString(TABLE_NAME));
}
catch (SQLException e) {
throw new DebeziumException("Cannot resolve TableId from result set data", e);
}
}
}

View File

@ -9,16 +9,16 @@
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
import oracle.net.ns.NetException;
@ -199,34 +199,111 @@ static String startLogMinerStatement(Long startScn, Long endScn, OracleConnector
}
/**
* This is the query from the LogMiner view to get changes. Columns of the view we using are:
* NOTE. Currently we do not capture changes from other schemas
* SCN - The SCN at which a change was made
* COMMIT_SCN - The SCN at which a change was committed
* USERNAME - Name of the user who executed the transaction
* SQL_REDO Reconstructed SQL statement that is equivalent to the original SQL statement that made the change
* OPERATION_CODE - Number of the operation code.
* TABLE_NAME - Name of the modified table
* TIMESTAMP - Timestamp when the database change was made
* This is the query from the LogMiner view to get changes.
*
* @param schemaName user name
* The query uses the following columns from the view:
* <pre>
* SCN - The SCN at which a change was made
* SQL_REDO Reconstructed SQL statement that is equivalent to the original SQL statement that made the change
* OPERATION_CODE - Number of the operation code
* TIMESTAMP - Timestamp when the database change was made
* XID - Transaction Identifier
* CSF - Continuation SQL flag, identifies rows that should be processed together as a single row (0=no,1=yes)
* TABLE_NAME - Name of the modified table
* SEG_OWNER - Schema/Tablespace name
* OPERATION - Database operation type
* USERNAME - Name of the user who executed the transaction
* </pre>
*
* @param connectorConfig the connector configuration
* @param logMinerUser log mining session user name
* @param schema schema
* @return the query
*/
static String logMinerContentsQuery(String schemaName, String logMinerUser, OracleDatabaseSchema schema) {
List<String> whiteListTableNames = schema.tableIds().stream().map(TableId::table).collect(Collectors.toList());
static String logMinerContentsQuery(OracleConnectorConfig connectorConfig, String logMinerUser) {
StringBuilder query = new StringBuilder();
query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME ");
query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");
query.append("WHERE OPERATION_CODE IN (1,2,3,5) ");
query.append("AND SCN >= ? ");
query.append("AND SCN < ? ");
query.append("AND TABLE_NAME != '").append(LOGMNR_FLUSH_TABLE).append("' ");
// todo: add ROW_ID, SESSION#, SERIAL#, RS_ID, and SSN
return "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " +
" FROM " + LOGMNR_CONTENTS_VIEW + " WHERE OPERATION_CODE in (1,2,3,5) " + // 5 - DDL
" AND SEG_OWNER = '" + schemaName.toUpperCase() + "' " +
buildTableInPredicate(whiteListTableNames) +
" AND SCN >= ? AND SCN < ? " +
// Capture DDL and MISSING_SCN rows only hwne not performed by SYS, SYSTEM, and LogMiner user
" OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','" + logMinerUser.toUpperCase() + "')) " +
// Capture all COMMIT and ROLLBACK operations performed by the database
" OR (OPERATION_CODE IN (7,36))"; // todo username = schemaName?
String schemaPredicate = buildSchemaPredicate(connectorConfig);
if (!Strings.isNullOrEmpty(schemaPredicate)) {
query.append("AND ").append(schemaPredicate).append(" ");
}
String tablePredicate = buildTablePredicate(connectorConfig);
if (!Strings.isNullOrEmpty(tablePredicate)) {
query.append("AND ").append(tablePredicate).append(" ");
}
query.append("OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','").append(logMinerUser.toUpperCase()).append("')) ");
query.append("OR (OPERATION_CODE IN (7,36))");
return query.toString();
}
private static String buildSchemaPredicate(OracleConnectorConfig connectorConfig) {
StringBuilder predicate = new StringBuilder();
if (Strings.isNullOrEmpty(connectorConfig.schemaIncludeList())) {
if (!Strings.isNullOrEmpty(connectorConfig.schemaExcludeList())) {
List<Pattern> patterns = Strings.listOfRegex(connectorConfig.schemaExcludeList(), 0);
predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", true)).append(")");
}
}
else {
List<Pattern> patterns = Strings.listOfRegex(connectorConfig.schemaIncludeList(), 0);
predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", false)).append(")");
}
return predicate.toString();
}
private static String buildTablePredicate(OracleConnectorConfig connectorConfig) {
StringBuilder predicate = new StringBuilder();
if (Strings.isNullOrEmpty(connectorConfig.tableIncludeList())) {
if (!Strings.isNullOrEmpty(connectorConfig.tableExcludeList())) {
List<Pattern> patterns = Strings.listOfRegex(connectorConfig.tableExcludeList(), 0);
predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", true)).append(")");
}
}
else {
List<Pattern> patterns = Strings.listOfRegex(connectorConfig.tableIncludeList(), 0);
predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", false)).append(")");
}
return predicate.toString();
}
private static String listOfPatternsToSql(List<Pattern> patterns, String columnName, boolean applyNot) {
StringBuilder predicate = new StringBuilder();
for (Iterator<Pattern> i = patterns.iterator(); i.hasNext();) {
Pattern pattern = i.next();
if (applyNot) {
predicate.append("NOT ");
}
// NOTE: The REGEXP_LIKE operator was added in Oracle 10g (10.1.0.0.0)
final String text = resolveRegExpLikePattern(pattern);
predicate.append("REGEXP_LIKE(").append(columnName).append(",'").append(text).append("','i')");
if (i.hasNext()) {
// Exclude lists imply combining them via AND, Include lists imply combining them via OR?
predicate.append(applyNot ? " AND " : " OR ");
}
}
return predicate.toString();
}
private static String resolveRegExpLikePattern(Pattern pattern) {
// The REGEXP_LIKE operator acts identical to LIKE in that it automatically prepends/appends "%".
// We need to resolve our matches to be explicit with "^" and "$" if they don't already exist so
// that the LIKE aspect of the match doesn't mistakenly filter "DEBEZIUM2" when using "DEBEZIUM".
String text = pattern.pattern();
if (!text.startsWith("^")) {
text = "^" + text;
}
if (!text.endsWith("$")) {
text += "$";
}
return text;
}
static String addLogFileStatement(String option, String fileName) {
@ -335,21 +412,4 @@ public static long parseRetentionFromName(String historyTableName) {
Integer.parseInt(tokens[6])); // minutes
return Duration.between(recorded, LocalDateTime.now()).toHours();
}
/**
* This method builds table_name IN predicate, filtering out non whitelisted tables from Log Mining.
* It limits joining condition over 1000 tables, Oracle will throw exception in such predicate.
* @param tables white listed table names
* @return IN predicate or empty string if number of whitelisted tables exceeds 1000
*/
private static String buildTableInPredicate(List<String> tables) {
if (tables.size() == 0 || tables.size() > 1000) {
LOGGER.warn(" Cannot apply {} whitelisted tables condition", tables.size());
return "";
}
StringJoiner tableNames = new StringJoiner(",");
tables.forEach(table -> tableNames.add("'" + table + "'"));
return " AND table_name IN (" + tableNames + ") ";
}
}

View File

@ -6,6 +6,7 @@
package io.debezium.connector.oracle.logminer.parser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
/**
@ -19,9 +20,10 @@ public interface DmlParser {
*
* @param sql the sql statement
* @param tables collection of known tables.
* @param tableId the table identifier
* @param txId the current transaction id the sql is part of.
* @return the parsed sql as a DML entry or {@code null} if the SQL couldn't be parsed.
* @throws DmlParserException thrown if a parse exception is detected.
*/
LogMinerDmlEntry parse(String sql, Tables tables, String txId);
LogMinerDmlEntry parse(String sql, Tables tables, TableId tableId, String txId);
}

View File

@ -15,6 +15,7 @@
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
/**
@ -70,7 +71,7 @@ public class LogMinerDmlParser implements DmlParser {
private static final int WHERE_LENGTH = WHERE.length();
@Override
public LogMinerDmlEntry parse(String sql, Tables tables, String txId) {
public LogMinerDmlEntry parse(String sql, Tables tables, TableId tableId, String txId) {
if (sql != null && sql.length() > 0) {
switch (sql.charAt(0)) {
case 'i':

View File

@ -53,7 +53,6 @@ public class SimpleDmlParser implements DmlParser {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleDmlParser.class);
protected final String catalogName;
protected final String schemaName;
private final OracleValueConverters converter;
private final CCJSqlParserManager pm;
private final Map<String, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
@ -64,18 +63,22 @@ public class SimpleDmlParser implements DmlParser {
/**
* Constructor
* @param catalogName database name
* @param schemaName user name
* @param converter value converter
*/
public SimpleDmlParser(String catalogName, String schemaName, OracleValueConverters converter) {
public SimpleDmlParser(String catalogName, OracleValueConverters converter) {
this.catalogName = catalogName;
this.schemaName = schemaName;
this.converter = converter;
pm = new CCJSqlParserManager();
}
@Override
public LogMinerDmlEntry parse(String dmlContent, Tables tables, String txId) {
/**
* This parses a DML
* @param dmlContent DML
* @param tables debezium Tables
* @param tableId the TableId for the log miner contents view row
* @return parsed value holder class
*/
public LogMinerDmlEntry parse(String dmlContent, Tables tables, TableId tableId, String txId) {
try {
// If a table contains Spatial data type, DML input generates two entries in REDO LOG.
@ -99,7 +102,7 @@ public LogMinerDmlEntry parse(String dmlContent, Tables tables, String txId) {
Statement st = pm.parse(new StringReader(dmlContent));
if (st instanceof Update) {
parseUpdate(tables, (Update) st);
parseUpdate(tables, tableId, (Update) st);
List<LogMinerColumnValue> actualNewValues = newColumnValues.values().stream()
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values().stream()
@ -108,14 +111,14 @@ public LogMinerDmlEntry parse(String dmlContent, Tables tables, String txId) {
}
else if (st instanceof Insert) {
parseInsert(tables, (Insert) st);
parseInsert(tables, tableId, (Insert) st);
List<LogMinerColumnValue> actualNewValues = newColumnValues.values()
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
return new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList());
}
else if (st instanceof Delete) {
parseDelete(tables, (Delete) st);
parseDelete(tables, tableId, (Delete) st);
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values()
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
return new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues);
@ -130,11 +133,13 @@ else if (st instanceof Delete) {
}
}
private void initColumns(Tables tables, String tableName) {
table = tables.forTable(catalogName, schemaName, tableName);
private void initColumns(Tables tables, TableId tableId, String tableName) {
if (!tableId.table().equals(tableName)) {
throw new ParsingException(null, "Resolved TableId expected table name '" + tableId.table() + "' but is '" + tableName + "'");
}
table = tables.forTable(tableId);
if (table == null) {
TableId id = new TableId(catalogName, schemaName, tableName);
throw new ParsingException(null, "Trying to parse a table '" + id + "', which does not exist.");
throw new ParsingException(null, "Trying to parse a table '" + tableId + "', which does not exist.");
}
for (int i = 0; i < table.columns().size(); i++) {
Column column = table.columns().get(i);
@ -147,13 +152,13 @@ private void initColumns(Tables tables, String tableName) {
}
// this parses simple statement with only one table
private void parseUpdate(Tables tables, Update st) throws JSQLParserException {
private void parseUpdate(Tables tables, TableId tableId, Update st) throws JSQLParserException {
int tableCount = st.getTables().size();
if (tableCount > 1 || tableCount == 0) {
throw new JSQLParserException("DML includes " + tableCount + " tables");
}
net.sf.jsqlparser.schema.Table parseTable = st.getTables().get(0);
initColumns(tables, ParserUtils.stripeQuotes(parseTable.getName()));
initColumns(tables, tableId, ParserUtils.stripeQuotes(parseTable.getName()));
List<net.sf.jsqlparser.schema.Column> columns = st.getColumns();
Alias alias = parseTable.getAlias();
@ -171,8 +176,8 @@ private void parseUpdate(Tables tables, Update st) throws JSQLParserException {
}
}
private void parseInsert(Tables tables, Insert st) {
initColumns(tables, ParserUtils.stripeQuotes(st.getTable().getName()));
private void parseInsert(Tables tables, TableId tableId, Insert st) {
initColumns(tables, tableId, ParserUtils.stripeQuotes(st.getTable().getName()));
Alias alias = st.getTable().getAlias();
aliasName = alias == null ? "" : alias.getName().trim();
@ -189,8 +194,8 @@ public void visit(ExpressionList expressionList) {
oldColumnValues.clear();
}
private void parseDelete(Tables tables, Delete st) {
initColumns(tables, ParserUtils.stripeQuotes(st.getTable().getName()));
private void parseDelete(Tables tables, TableId tableId, Delete st) {
initColumns(tables, tableId, ParserUtils.stripeQuotes(st.getTable().getName()));
Alias alias = st.getTable().getAlias();
aliasName = alias == null ? "" : alias.getName().trim();

View File

@ -48,7 +48,6 @@ public void validLogminerNoUrl() throws Exception {
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(OracleConnectorConfig.HOSTNAME, "MyHostname")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.SCHEMA_NAME, "myschema")
.with(OracleConnectorConfig.USER, "debezium")
.build());
assertTrue(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error));
@ -77,7 +76,6 @@ public void validLogminerWithUrl() throws Exception {
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(OracleConnectorConfig.URL, "MyHostname")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.SCHEMA_NAME, "myschema")
.with(OracleConnectorConfig.USER, "debezium")
.build());
assertTrue(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error));
@ -93,7 +91,6 @@ public void validUrlTNS() throws Exception {
.with(OracleConnectorConfig.URL,
"jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.11)(PORT=1701))(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.12)(PORT=1701))(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.13)(PORT=1701))(LOAD_BALANCE = yes)(FAILOVER = on)(CONNECT_DATA =(SERVER = DEDICATED)(SERVICE_NAME = myserver.mydomain.com)(FAILOVER_MODE =(TYPE = SELECT)(METHOD = BASIC)(RETRIES = 3)(DELAY = 5))))")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.SCHEMA_NAME, "myschema")
.with(OracleConnectorConfig.USER, "debezium")
.build());
assertTrue(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error));
@ -107,7 +104,6 @@ public void invalidNoHostnameNoUri() throws Exception {
.with(OracleConnectorConfig.CONNECTOR_ADAPTER, "logminer")
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.SCHEMA_NAME, "myschema")
.with(OracleConnectorConfig.USER, "debezium")
.build());
assertFalse(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error));

View File

@ -27,14 +27,13 @@
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
/**
@ -121,174 +120,64 @@ public void after() throws SQLException {
}
@Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming")
public void shouldApplyTableWhitelistConfiguration() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(
RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
"DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')");
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("COMMIT");
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9, 0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO c##xstrm");
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text-1");
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
shouldApplyTableInclusionConfiguration(true);
}
@Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming")
public void shouldApplyTableIncludeListConfiguration() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(
OracleConnectorConfig.TABLE_INCLUDE_LIST,
"DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')");
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("COMMIT");
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9, 0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text-1");
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
shouldApplyTableInclusionConfiguration(false);
}
@Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming")
public void shouldApplyTableBlacklistConfiguration() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(
OracleConnectorConfig.TABLE_BLACKLIST,
"DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')");
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("COMMIT");
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9,0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text-1");
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
shouldApplyTableExclusionsConfiguration(true);
}
@Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not support DDL during streaming")
public void shouldApplyTableExcludeListConfiguration() throws Exception {
shouldApplyTableExclusionsConfiguration(false);
}
@Test
@FixFor("DBZ-3009")
public void shouldApplySchemaAndTableWhitelistConfiguration() throws Exception {
shouldApplySchemaAndTableInclusionConfiguration(true);
}
@Test
@FixFor("DBZ-3009")
public void shouldApplySchemaAndTableIncludeListConfiguration() throws Exception {
shouldApplySchemaAndTableInclusionConfiguration(false);
}
@Test
@FixFor("DBZ-3009")
public void shouldApplySchemaAndTableBlacklistConfiguration() throws Exception {
shouldApplySchemaAndTableExclusionsConfiguration(true);
}
@Test
@FixFor("DBZ-3009")
public void shouldApplySchemaAndTableExcludeListConfiguration() throws Exception {
shouldApplySchemaAndTableExclusionsConfiguration(false);
}
private void shouldApplyTableInclusionConfiguration(boolean useLegacyOption) throws Exception {
Field option = OracleConnectorConfig.TABLE_INCLUDE_LIST;
if (useLegacyOption) {
option = OracleConnectorConfig.TABLE_WHITELIST;
}
boolean includeDdlChanges = true;
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
// LogMiner currently does not support DDL changes during streaming phase
includeDdlChanges = false;
}
Configuration config = TestHelper.defaultConfig()
.with(
OracleConnectorConfig.TABLE_EXCLUDE_LIST,
"DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*")
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(option, "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
@ -302,19 +191,21 @@ public void shouldApplyTableExcludeListConfiguration() throws Exception {
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("COMMIT");
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9,0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
if (includeDdlChanges) {
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9, 0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
}
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(2);
SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 2 : 1);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1);
@ -327,17 +218,229 @@ public void shouldApplyTableExcludeListConfiguration() throws Exception {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
if (includeDdlChanges) {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
}
}
private void shouldApplySchemaAndTableInclusionConfiguration(boolean useLegacyOption) throws Exception {
Field option = OracleConnectorConfig.TABLE_INCLUDE_LIST;
if (useLegacyOption) {
option = OracleConnectorConfig.TABLE_WHITELIST;
}
boolean includeDdlChanges = true;
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
// LogMiner currently does not support DDL changes during streaming phase
includeDdlChanges = false;
}
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM,DEBEZIUM2")
.with(option, "DEBEZIUM2\\.TABLE2,DEBEZIUM\\.TABLE1,DEBEZIUM\\.TABLE3")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')");
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("INSERT INTO debezium2.table2 VALUES (1, 'Text2-1')");
connection.execute("COMMIT");
if (includeDdlChanges) {
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9, 0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
}
SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 3 : 2);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text-1");
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text2-1");
if (includeDdlChanges) {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
}
}
private void shouldApplyTableExclusionsConfiguration(boolean useLegacyOption) throws Exception {
Field option = OracleConnectorConfig.TABLE_EXCLUDE_LIST;
if (useLegacyOption) {
option = OracleConnectorConfig.TABLE_BLACKLIST;
}
boolean includeDdlChanges = true;
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
// LogMiner currently does not support DDL changes during streaming phase
includeDdlChanges = false;
}
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(option, "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')");
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("COMMIT");
if (includeDdlChanges) {
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9,0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
}
SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 2 : 1);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text-1");
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
if (includeDdlChanges) {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
}
}
private void shouldApplySchemaAndTableExclusionsConfiguration(boolean useLegacyOption) throws Exception {
Field option = OracleConnectorConfig.TABLE_EXCLUDE_LIST;
if (useLegacyOption) {
option = OracleConnectorConfig.TABLE_BLACKLIST;
}
boolean includeDdlChanges = true;
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
// LogMiner currently does not support DDL changes during streaming phase
includeDdlChanges = false;
}
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SCHEMA_EXCLUDE_LIST, "DEBEZIUM,SYS")
.with(option, "DEBEZIUM\\.TABLE2,DEBEZIUM\\.CUSTOMER.*")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.table1 VALUES (1, 'Text-1')");
connection.execute("INSERT INTO debezium.table2 VALUES (2, 'Text-2')");
connection.execute("INSERT INTO debezium2.table2 VALUES (1, 'Text2-1')");
connection.execute("COMMIT");
if (includeDdlChanges) {
String ddl = "CREATE TABLE debezium.table3 (" +
" id NUMERIC(9,0) NOT NULL, " +
" name VARCHAR2(1000), " +
" PRIMARY KEY (id)" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.table3 TO " + TestHelper.getConnectorUserName());
connection.execute("INSERT INTO debezium.table3 VALUES (3, 'Text-3')");
connection.execute("COMMIT");
}
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE2");
assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text2-1");
if (includeDdlChanges) {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
}
}
@Test
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
public void shouldTakeTimeDifference() throws Exception {
Testing.Print.enable();
String stmt = "select current_timestamp from dual";

View File

@ -255,19 +255,6 @@ public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
continueStreamingAfterSnapshot(config);
}
@Test
@FixFor("DBZ-2607")
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "Creates a backward compatibility regression")
public void shouldNotRequireDatabaseSchemaConfiguration() throws Exception {
final Map<String, ?> configMap = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")
.build()
.asMap();
configMap.remove(OracleConnectorConfig.SCHEMA_NAME.name());
continueStreamingAfterSnapshot(Configuration.from(configMap));
}
private void continueStreamingAfterSnapshot(Configuration config) throws Exception {
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
@ -658,7 +645,7 @@ public void deleteWithoutTombstone() throws Exception {
}
@Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "Seems to get caught in loop?")
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.LOGMINER, reason = "LogMiner does not yet support DDL during streaming")
public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception {
TestHelper.dropTable(connection, "debezium.customer2");

View File

@ -42,7 +42,7 @@ public void testParsingInsert() throws Exception {
"('1','Acme',TO_TIMESTAMP('2020-02-01 00:00:00.'),Unsupported Type," +
"TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'),Unsupported Type,NULL);";
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null);
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
assertThat(entry.getCommandType()).isEqualTo(Operation.CREATE);
assertThat(entry.getOldValues()).isEmpty();
assertThat(entry.getNewValues()).hasSize(7);
@ -72,7 +72,7 @@ public void testParsingUpdate() throws Exception {
"\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') and " +
"\"UT2\" = Unsupported Type and \"C1\" = NULL;";
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null);
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE);
assertThat(entry.getOldValues()).hasSize(7);
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
@ -112,7 +112,7 @@ public void testParsingDelete() throws Exception {
"where \"ID\" = '1' and \"NAME\" = 'Acme' and \"TS\" = TO_TIMESTAMP('2020-02-01 00:00:00.') and " +
"\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS');";
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null);
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE);
assertThat(entry.getOldValues()).hasSize(5);
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");

View File

@ -38,6 +38,7 @@
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.IoUtil;
@ -57,6 +58,7 @@ public class OracleDmlParserTest {
private static final String CATALOG_NAME = "ORCLPDB1";
private static final String SCHEMA_NAME = "DEBEZIUM";
private static final String FULL_TABLE_NAME = SCHEMA_NAME + "\".\"" + TABLE_NAME;
private static final TableId TABLE_ID = TableId.parse(CATALOG_NAME + "." + SCHEMA_NAME + "." + TABLE_NAME);
private static final String SPATIAL_DATA = "SDO_GEOMETRY(2003, NULL, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY" +
"(102604.878, 85772.8286, 101994.879, 85773.6633, 101992.739, 84209.6648, 102602.738, 84208.83, 102604.878, 85772.8286))";
private static final String SPATIAL_DATA_1 = "'unsupported type'";
@ -72,7 +74,7 @@ public void setUp() {
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
antlrDmlParser = new OracleDmlParser(true, CATALOG_NAME, SCHEMA_NAME, converters);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
tables = new Tables();
CLOB_DATA = StringUtils.repeat("clob_", 4000);
@ -98,7 +100,7 @@ public void shouldParseAliasUpdate() throws Exception {
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
verifyUpdate(record, false, true, 9);
record = sqlDmlParser.parse(dml, tables, "1");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
verifyUpdate(record, false, true, 9);
}
@ -136,7 +138,7 @@ public void shouldParseDateFormats() throws Exception {
private void parseDate(String format, boolean validateDate) {
String dml = "update \"" + FULL_TABLE_NAME + "\" a set a.\"col7\" = " + format + ", a.\"col13\" = " + format + " where a.ID = 1;";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, "1");
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
assertThat(record).isNotNull();
assertThat(record.getNewValues()).isNotEmpty();
assertThat(record.getOldValues()).isNotEmpty();
@ -154,7 +156,7 @@ private void parseTimestamp(String format, boolean validateTimestamp) {
"and a.COL3 = 'text' and a.COL4 IS NULL and a.\"COL5\" IS NULL and a.COL6 IS NULL " +
"and a.COL8 = " + format + " and a.col11 is null;";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, "1");
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
assertThat(record.getNewValues()).isNotEmpty();
assertThat(record.getOldValues()).isNotEmpty();
@ -174,7 +176,7 @@ public void shouldParseAliasInsert() throws Exception {
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
verifyInsert(record);
record = sqlDmlParser.parse(dml, tables, "1");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
verifyInsert(record);
}
@ -190,7 +192,7 @@ public void shouldParseAliasDelete() throws Exception {
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
verifyDelete(record, true);
record = sqlDmlParser.parse(dml, tables, "1");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
verifyDelete(record, true);
}
@ -209,7 +211,7 @@ public void shouldParseNoWhereClause() throws Exception {
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
verifyUpdate(record, false, false, 9);
record = sqlDmlParser.parse(dml, tables, "1");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
verifyUpdate(record, false, false, 9);
dml = "delete from \"" + FULL_TABLE_NAME + "\" a ";
@ -217,7 +219,7 @@ record = sqlDmlParser.parse(dml, tables, "1");
record = antlrDmlParser.getDmlEntry();
verifyDelete(record, false);
record = sqlDmlParser.parse(dml, tables, "1");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
verifyDelete(record, false);
}
@ -233,7 +235,7 @@ public void shouldParseInsertAndDeleteTable() throws Exception {
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
verifyInsert(record);
record = sqlDmlParser.parse(dml, tables, "1");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
verifyInsert(record);
dml = "delete from \"" + FULL_TABLE_NAME +
@ -243,7 +245,7 @@ record = sqlDmlParser.parse(dml, tables, "1");
record = antlrDmlParser.getDmlEntry();
verifyDelete(record, true);
record = sqlDmlParser.parse(dml, tables, "");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
verifyDelete(record, true);
}
@ -264,7 +266,7 @@ public void shouldParseUpdateTable() throws Exception {
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
// verifyUpdate(record, true, true);
record = sqlDmlParser.parse(dml, tables, "");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
verifyUpdate(record, true, true, 11);
dml = "update \"" + FULL_TABLE_NAME
@ -274,7 +276,7 @@ record = sqlDmlParser.parse(dml, tables, "");
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL " +
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";";
record = sqlDmlParser.parse(dml, tables, "");
record = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
}
@Test
@ -289,7 +291,7 @@ public void shouldParseUpdateNoChangesTable() throws Exception {
+
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, "");
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
boolean pass = record.getCommandType().equals(Envelope.Operation.UPDATE)
&& record.getOldValues().size() == record.getNewValues().size()
&& record.getNewValues().containsAll(record.getOldValues());
@ -308,7 +310,7 @@ public void shouldParseSpecialCharacters() throws Exception {
antlrDmlParser.parse(dml, tables);
assertThat(antlrDmlParser.getDmlEntry()).isNotNull();
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, "1");
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
assertThat(result).isNotNull();
LogMinerColumnValue value = result.getNewValues().get(2);
assertThat(value.getColumnData().toString()).contains("\\");
@ -319,7 +321,7 @@ public void shouldParseSpecialCharacters() throws Exception {
antlrDmlParser.parse(dml, tables);
assertThat(antlrDmlParser.getDmlEntry()).isNotNull();
result = sqlDmlParser.parse(dml, tables, "");
result = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
assertThat(result).isNotNull();
value = result.getOldValues().get(3);
assertThat(value.getColumnData().toString()).contains("\\");
@ -330,41 +332,42 @@ public void shouldParseStrangeDml() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = null;
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, "");
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
assertThat(result).isNull();
dml = "select * from test;null;";
assertDmlParserException(dml, sqlDmlParser, tables, "");
assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, "");
assertThat(result).isNull();
dml = "full dummy mess";
assertDmlParserException(dml, sqlDmlParser, tables, "");
assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, "");
dml = "delete from non_exiting_table " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
assertDmlParserException(dml, sqlDmlParser, tables, "");
assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, "");
Update update = mock(Update.class);
Mockito.when(update.getTables()).thenReturn(new ArrayList<>());
dml = "update \"" + FULL_TABLE_NAME + "\" set col1 = 3 " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null and col20 is null";
result = sqlDmlParser.parse(dml, tables, "");
result = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
assertThat(result.getOldValues().size()).isEqualTo(12);
assertThat(result.getOldValues().size() == 12).isTrue();
dml = "update \"" + FULL_TABLE_NAME + "\" set col1 = 3 " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col30 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col21 is null";
result = sqlDmlParser.parse(dml, tables, "");
result = sqlDmlParser.parse(dml, tables, TABLE_ID, "");
assertThat(result.getNewValues().size()).isEqualTo(14);
dml = "update table1, \"" + FULL_TABLE_NAME + "\" set col1 = 3 " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null and col20 is null";
assertDmlParserException(dml, sqlDmlParser, tables, "");
assertDmlParserException(dml, sqlDmlParser, tables, TABLE_ID, "");
}
private void assertDmlParserException(String sql, DmlParser parser, Tables tables, String txId) {
private void assertDmlParserException(String sql, DmlParser parser, Tables tables, TableId tableId, String txId) {
try {
LogMinerDmlEntry dml = parser.parse(sql, tables, txId);
LogMinerDmlEntry dml = parser.parse(sql, tables, tableId, txId);
}
catch (Exception e) {
assertThat(e).isInstanceOf(DmlParserException.class);

View File

@ -24,6 +24,7 @@
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
@ -171,7 +172,7 @@ public void testGetTableId() throws SQLException {
tableId = RowMapper.getTableId("catalog", rs);
fail("RowMapper should not have returned a TableId");
}
catch (SQLException e) {
catch (DebeziumException e) {
assertThat(tableId).isNull();
}
}
@ -191,7 +192,7 @@ public void testGetTableIdWithVariedCase() throws SQLException {
tableId = RowMapper.getTableId("catalog", rs);
fail("RowMapper should not have returned a TableId");
}
catch (SQLException e) {
catch (DebeziumException e) {
assertThat(tableId).isNull();
}
}

View File

@ -12,8 +12,6 @@
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Set;
import org.junit.Rule;
import org.junit.Test;
@ -21,10 +19,10 @@
import org.mockito.Mockito;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
import io.debezium.doc.FixFor;
import io.debezium.relational.TableId;
@SkipWhenAdapterNameIsNot(value = AdapterName.LOGMINER)
@ -33,6 +31,143 @@ public class SqlUtilsTest {
@Rule
public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME "
+
"FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE IN (1,2,3,5) AND SCN >= ? AND SCN < ? " +
"AND TABLE_NAME != '" + SqlUtils.LOGMNR_FLUSH_TABLE + "' " +
"${schemaPredicate}" +
"${tablePredicate}" +
"OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','${user}')) " +
"OR (OPERATION_CODE IN (7,36))";
private static final String USERNAME = "USERNAME";
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithNoFilters() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn(null);
Mockito.when(config.schemaExcludeList()).thenReturn(null);
Mockito.when(config.tableIncludeList()).thenReturn(null);
Mockito.when(config.tableExcludeList()).thenReturn(null);
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(null, null));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithSchemaInclude() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn("SCHEMA1,SCHEMA2");
Mockito.when(config.schemaExcludeList()).thenReturn(null);
Mockito.when(config.tableIncludeList()).thenReturn(null);
Mockito.when(config.tableExcludeList()).thenReturn(null);
String schema = "AND (REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') OR REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, null));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithSchemaExclude() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn(null);
Mockito.when(config.schemaExcludeList()).thenReturn("SCHEMA1,SCHEMA2");
Mockito.when(config.tableIncludeList()).thenReturn(null);
Mockito.when(config.tableExcludeList()).thenReturn(null);
String schema = "AND (NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') AND NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, null));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithTableInclude() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn(null);
Mockito.when(config.schemaExcludeList()).thenReturn(null);
Mockito.when(config.tableIncludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB");
Mockito.when(config.tableExcludeList()).thenReturn(null);
String table = "AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " +
"OR REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(null, table));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithTableExcludes() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn(null);
Mockito.when(config.schemaExcludeList()).thenReturn(null);
Mockito.when(config.tableIncludeList()).thenReturn(null);
Mockito.when(config.tableExcludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB");
String table = "AND (NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " +
"AND NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(null, table));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithSchemaTableIncludes() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn("SCHEMA1,SCHEMA2");
Mockito.when(config.schemaExcludeList()).thenReturn(null);
Mockito.when(config.tableIncludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB");
Mockito.when(config.tableExcludeList()).thenReturn(null);
String schema = "AND (REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') OR REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) ";
String table = "AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " +
"OR REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, table));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithSchemaTableExcludes() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn(null);
Mockito.when(config.schemaExcludeList()).thenReturn("SCHEMA1,SCHEMA2");
Mockito.when(config.tableIncludeList()).thenReturn(null);
Mockito.when(config.tableExcludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB");
String schema = "AND (NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') AND NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) ";
String table = "AND (NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " +
"AND NOT REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, table));
}
@Test
@FixFor("DBZ-3009")
public void testLogMinerQueryWithSchemaExcludeTableInclude() {
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
Mockito.when(config.schemaIncludeList()).thenReturn(null);
Mockito.when(config.schemaExcludeList()).thenReturn("SCHEMA1,SCHEMA2");
Mockito.when(config.tableIncludeList()).thenReturn("DEBEZIUM\\.TABLEA,DEBEZIUM\\.TABLEB");
Mockito.when(config.tableExcludeList()).thenReturn(null);
String schema = "AND (NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA1$','i') AND NOT REGEXP_LIKE(SEG_OWNER,'^SCHEMA2$','i')) ";
String table = "AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEA$','i') " +
"OR REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^DEBEZIUM\\.TABLEB$','i')) ";
String result = SqlUtils.logMinerContentsQuery(config, USERNAME);
assertThat(result).isEqualTo(resolveLogMineryContentQueryFromTemplate(schema, table));
}
@Test
public void testStatements() {
SqlUtils.setRac(false);
@ -41,28 +176,6 @@ public void testStatements() {
String expected = "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => 'FILENAME', OPTIONS => ADD);END;";
assertThat(expected.equals(result)).isTrue();
OracleDatabaseSchema schema = mock(OracleDatabaseSchema.class);
TableId table1 = new TableId("catalog", "schema", "table1");
TableId table2 = new TableId("catalog", "schema", "table2");
Set<TableId> tables = new HashSet<>();
Mockito.when(schema.tableIds()).thenReturn(tables);
result = SqlUtils.logMinerContentsQuery("DATABASE", "SCHEMA", schema);
expected = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " +
"FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE in (1,2,3,5) AND SEG_OWNER = 'DATABASE' AND SCN >= ? AND SCN < ? " +
"OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','SCHEMA')) " +
" OR (OPERATION_CODE IN (7,36))";
assertThat(result).isEqualTo(expected);
tables.add(table1);
tables.add(table2);
result = SqlUtils.logMinerContentsQuery("DATABASE", "SCHEMA", schema);
expected = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME " +
"FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE in (1,2,3,5) " +
"AND SEG_OWNER = 'DATABASE' AND table_name IN ('table1','table2') " +
"AND SCN >= ? AND SCN < ? OR (OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','SCHEMA')) " +
" OR (OPERATION_CODE IN (7,36))";
assertThat(result).isEqualTo(expected);
result = SqlUtils.databaseSupplementalLoggingMinCheckQuery();
expected = "SELECT 'KEY', SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE";
assertThat(result).isEqualTo(expected);
@ -201,4 +314,12 @@ public void shouldDetectConnectionProblems() {
assertThat(SqlUtils.connectionProblem(new Exception("12543 problem"))).isFalse();
}
private String resolveLogMineryContentQueryFromTemplate(String schemaReplacement, String tableReplacement) {
String query = LOG_MINER_CONTENT_QUERY_TEMPLATE;
query = query.replace("${schemaPredicate}", schemaReplacement == null ? "" : schemaReplacement);
query = query.replace("${tablePredicate}", tableReplacement == null ? "" : tableReplacement);
query = query.replace("${user}", USERNAME);
return query;
}
}

View File

@ -33,6 +33,7 @@
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.IoUtil;
@ -45,6 +46,7 @@ public class ValueHolderTest {
private SimpleDmlParser sqlDmlParser;
private Tables tables;
private static final String FULL_TABLE_NAME = SCHEMA_NAME + "\".\"" + TABLE_NAME;
private static final TableId TABLE_ID = TableId.parse(CATALOG_NAME + "." + SCHEMA_NAME + "." + TABLE_NAME);
@Rule
public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();
@ -53,7 +55,7 @@ public class ValueHolderTest {
public void setUp() {
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
tables = new Tables();
}
@ -81,7 +83,7 @@ public void testValueHolders() throws Exception {
ddlParser.parse(createStatement, tables);
String dml = "insert into \"" + FULL_TABLE_NAME + "\" (\"column1\",\"column2\") values ('5','Text');";
LogMinerDmlEntry dmlEntryParsed = sqlDmlParser.parse(dml, tables, "1");
LogMinerDmlEntry dmlEntryParsed = sqlDmlParser.parse(dml, tables, TABLE_ID, "1");
assertThat(dmlEntryParsed.equals(dmlEntryExpected)).isTrue();
assertThat(dmlEntryExpected.getCommandType() == Envelope.Operation.CREATE).isTrue();

View File

@ -97,7 +97,6 @@ public static Configuration.Builder defaultConfig() {
return builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME)
.with(OracleConnectorConfig.PDB_NAME, "ORCLPDB1")
.with(OracleConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(OracleConnectorConfig.SCHEMA_NAME, SCHEMA_USER)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
}