DBZ-3155 Remove legacy DDL JSqlParser implementation

This commit is contained in:
Chris Cranford 2021-06-29 08:38:21 -04:00 committed by Gunnar Morling
parent d804f41775
commit c7622cbc18
10 changed files with 1 additions and 1189 deletions

View File

@ -177,15 +177,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription("Complete JDBC URL as an alternative to specifying hostname, port and database provided "
+ "as a way to support alternative connection scenarios.");
public static final Field LOG_MINING_DML_PARSER = Field.createInternal("log.mining.dml.parser")
.withDisplayName("Log Mining DML parser implementation")
.withEnum(LogMiningDmlParser.class, LogMiningDmlParser.FAST)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The parser implementation to use when parsing DML operations:" +
"'legacy': the legacy parser implementation based on JSqlParser; " +
"'fast': the robust parser implementation that is streamlined specifically for LogMiner redo format");
public static final Field LOG_MINING_ARCHIVE_LOG_HOURS = Field.create("log.mining.archive.log.hours")
.withDisplayName("Log Mining Archive Log Hours")
.withType(Type.LONG)
@ -327,7 +318,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_SLEEP_TIME_MAX_MS,
LOG_MINING_SLEEP_TIME_INCREMENT_MS,
LOG_MINING_TRANSACTION_RETENTION,
LOG_MINING_DML_PARSER,
LOG_MINING_ARCHIVE_LOG_ONLY_MODE,
LOB_ENABLED,
LOG_MINING_ARCHIVE_DESTINATION_NAME)
@ -372,7 +362,6 @@ public static ConfigDef configDef() {
private final Duration logMiningSleepTimeDefault;
private final Duration logMiningSleepTimeIncrement;
private final Duration logMiningTransactionRetention;
private final LogMiningDmlParser dmlParser;
private final boolean archiveLogOnlyMode;
private final boolean lobEnabled;
private final String logMiningArchiveDestinationName;
@ -411,7 +400,6 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningSleepTimeDefault = Duration.ofMillis(config.getInteger(LOG_MINING_SLEEP_TIME_DEFAULT_MS));
this.logMiningSleepTimeIncrement = Duration.ofMillis(config.getInteger(LOG_MINING_SLEEP_TIME_INCREMENT_MS));
this.logMiningTransactionRetention = Duration.ofHours(config.getInteger(LOG_MINING_TRANSACTION_RETENTION));
this.dmlParser = LogMiningDmlParser.parse(config.getString(LOG_MINING_DML_PARSER));
this.archiveLogOnlyMode = config.getBoolean(LOG_MINING_ARCHIVE_LOG_ONLY_MODE);
this.logMiningArchiveDestinationName = config.getString(LOG_MINING_ARCHIVE_DESTINATION_NAME);
}
@ -929,13 +917,6 @@ public Duration getLogMiningTransactionRetention() {
return logMiningTransactionRetention;
}
/**
* @return the log mining parser implementation to be used
*/
public LogMiningDmlParser getLogMiningDmlParser() {
return dmlParser;
}
/**
* @return true if the connector is to mine archive logs only, false to mine all logs.
*/

View File

@ -26,7 +26,6 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
@ -284,12 +283,6 @@ private String convertOracleUnistr(Column column, Field fieldDefn, String data)
for (int i = 0; i < data.length(); ++i) {
char c = data.charAt(i);
if (c == '\\') {
// handle special legacy parser use case where '\' is actually '\\', necessitated by JSqlParser
// can safely be removed when SimpleDmlParser is retired
if (data.charAt(i + 1) == '\\') {
// advance by 1
i += 1;
}
if (data.length() >= (i + 4)) {
// Read next 4 character hex and convert to character.
result.append(Character.toChars(Integer.parseInt(data.substring(i + 1, i + 5), 16)));
@ -429,14 +422,6 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) {
data = withScaleAdjustedIfNeeded(column, (BigDecimal) data);
}
// When SimpleDmlParser is removed, the following block can be removed.
// This is necessitated by the fact SimpleDmlParser invokes the converters internally and
// won't be needed when that parser is no longer part of the source.
if (data instanceof Struct) {
SpecialValueDecimal value = VariableScaleDecimal.toLogical((Struct) data);
return value.getDecimalValue().orElse(null);
}
return super.convertDecimal(column, fieldDefn, data);
}

View File

@ -1,140 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.antlr.listener;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.ValueConverter;
/**
* This class contains a few methods for parser listeners
*/
public class ParserUtils {
private ParserUtils() {
}
/**
* stripes double quotes that surrounds a string
* @param text text
* @return text without surrounding double quotes
*/
public static String stripeQuotes(String text) {
if (text != null && text.indexOf("\"") == 0 && text.lastIndexOf("\"") == text.length() - 1) {
return text.substring(1, text.length() - 1);
}
return text;
}
/**
* this method stripes table alias and dot give string
* @param text string with possible alias
* @param alias table alias
* @return striped string
*/
public static String stripeAlias(String text, String alias) {
int index = text.indexOf(alias + ".");
if (index >= 0) {
return text.substring(alias.length() + 1);
}
return text;
}
/**
* Initialize new column values with old column values.
* It does not override new values which were processed already in where clause parsing
* @param newColumnValues values to set or insert
* @param oldColumnValues values in WHERE clause
* @param table Debezium Table object
*/
public static void cloneOldToNewColumnValues(Map<String, LogMinerColumnValueWrapper> newColumnValues, Map<String, LogMinerColumnValueWrapper> oldColumnValues,
Table table) {
for (Column column : table.columns()) {
final LogMinerColumnValueWrapper newColumnValue = newColumnValues.get(column.name());
if (!newColumnValue.isProcessed()) {
final LogMinerColumnValueWrapper oldColumnValue = oldColumnValues.get(column.name());
newColumnValue.setProcessed(true);
newColumnValue.getColumnValue().setColumnData(oldColumnValue.getColumnValue().getColumnData());
}
}
}
/**
* This converts the given value to the appropriate object. The conversion is based on the column definition
*
* @param column column Object
* @param value value object
* @param converters given converter
* @return object as the result of this conversion. It could be null if converter cannot build the schema
* or if converter or value are null
*/
public static Object convertValueToSchemaType(Column column, Object value, OracleValueConverters converters) {
if (converters != null && value != null) {
final SchemaBuilder schemaBuilder = converters.schemaBuilder(column);
if (schemaBuilder == null) {
return null;
}
final Schema schema = schemaBuilder.build();
final Field field = new Field(column.name(), 1, schema);
final ValueConverter valueConverter = converters.converter(column, field);
return valueConverter.convert(value);
}
return null;
}
/**
* In some cases values of the parsed expression are enclosed in apostrophes.
* Even null values are surrounded by single apostrophes. This method removes them.
*
* @param text supplied value which might be enclosed by apostrophes.
* @return clean String or null in case if test = "null" or = "NULL"
*/
public static String removeApostrophes(String text) {
if (text != null && text.indexOf("'") == 0 && text.lastIndexOf("'") == text.length() - 1) {
return text.substring(1, text.length() - 1);
}
if ("null".equalsIgnoreCase(text)) {
return null;
}
return text;
}
/**
* this is to handle cases when a record contains escape character(s)
* @param text before parsing we replaced it with double escape, now revert it back
* @return string with double slashes
*/
public static String replaceDoubleBackSlashes(String text) {
if (text != null && text.contains("\\\\")) {
return text.replaceAll("\\\\\\\\", "\\\\");
}
return text;
}
/**
* Obtains the table name
* @param tableview_name table view context
* @return table name
*/
static String getTableName(final PlSqlParser.Tableview_nameContext tableview_name) {
if (tableview_name.id_expression() != null) {
return stripeQuotes(tableview_name.id_expression().getText());
}
else {
return stripeQuotes(tableview_name.identifier().id_expression().getText());
}
}
}

View File

@ -17,18 +17,15 @@
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningDmlParser;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.parser.DmlParser;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
@ -72,17 +69,10 @@ class LogMinerQueryResultProcessor {
this.schema = schema;
this.dispatcher = dispatcher;
this.connectorConfig = connectorConfig;
this.dmlParser = resolveParser(connectorConfig, schema.getValueConverters());
this.dmlParser = new LogMinerDmlParser();
this.selectLobParser = new SelectLobParser();
}
private static DmlParser resolveParser(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters) {
if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) {
return new SimpleDmlParser(connectorConfig.getCatalogName(), valueConverters);
}
return new LogMinerDmlParser();
}
/**
* This method does all the job
* @param resultSet the info from LogMiner view
@ -383,9 +373,6 @@ private LogMinerDmlEntry parse(String redoSql, Table table, String txId) {
StringBuilder message = new StringBuilder();
message.append("DML statement couldn't be parsed.");
message.append(" Please open a Jira issue with the statement '").append(redoSql).append("'.");
if (LogMiningDmlParser.FAST.equals(connectorConfig.getLogMiningDmlParser())) {
message.append(" You can set internal.log.mining.dml.parser='legacy' as a workaround until the parse error is fixed.");
}
throw new DmlParserException(message.toString(), e);
}

View File

@ -1,283 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.parser;
import java.io.StringReader;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.antlr.listener.ParserUtils;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.text.ParsingException;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Alias;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.ExpressionVisitorAdapter;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
import net.sf.jsqlparser.expression.operators.relational.ItemsList;
import net.sf.jsqlparser.expression.operators.relational.ItemsListVisitorAdapter;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
/**
* This class does parsing of simple DML: insert, update, delete.
* LogMiner supplies very simple syntax , that this parser should be sufficient to parse those.
* It does no support joins, merge, sub-selects and other complicated cases, which should be OK for LogMiner case
*/
public class SimpleDmlParser implements DmlParser {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleDmlParser.class);
protected final String catalogName;
private final OracleValueConverters converter;
private final CCJSqlParserManager pm;
private final Map<String, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
private final Map<String, LogMinerColumnValueWrapper> oldColumnValues = new LinkedHashMap<>();
protected Table table;
private String aliasName;
/**
* Constructor
* @param catalogName database name
* @param converter value converter
*/
public SimpleDmlParser(String catalogName, OracleValueConverters converter) {
this.catalogName = catalogName;
this.converter = converter;
pm = new CCJSqlParserManager();
}
/**
* This parses a DML
* @param dmlContent DML
* @param table the table
* @return parsed value holder class
*/
@Override
public LogMinerDmlEntry parse(String dmlContent, Table table, String txId) {
try {
// If a table contains Spatial data type, DML input generates two entries in REDO LOG.
// First with actual statement and second with NULL. It is not relevant at this point
if (dmlContent == null) {
LOGGER.debug("Cannot parse NULL , transaction: {}", txId);
return null;
}
if (dmlContent.endsWith(";null;")) {
dmlContent = dmlContent.substring(0, dmlContent.lastIndexOf(";null;"));
}
if (!dmlContent.endsWith(";")) {
dmlContent = dmlContent + ";";
}
// this is to handle cases when a record contains escape character(s). This parser throws.
dmlContent = dmlContent.replaceAll("\\\\", "\\\\\\\\");
dmlContent = dmlContent.replaceAll("= Unsupported Type", "= null"); // todo address spatial data types
newColumnValues.clear();
oldColumnValues.clear();
Statement st = pm.parse(new StringReader(dmlContent));
if (st instanceof Update) {
parseUpdate(table, (Update) st);
Object[] actualNewValues = getColumnValueArray(newColumnValues, false);
Object[] actualOldValues = getColumnValueArray(oldColumnValues, false);
return LogMinerDmlEntryImpl.forUpdate(actualNewValues, actualOldValues);
}
else if (st instanceof Insert) {
parseInsert(table, (Insert) st);
Object[] actualNewValues = getColumnValueArray(newColumnValues, false);
return LogMinerDmlEntryImpl.forInsert(actualNewValues);
}
else if (st instanceof Delete) {
parseDelete(table, (Delete) st);
Object[] actualOldValues = getColumnValueArray(oldColumnValues, false);
return LogMinerDmlEntryImpl.forDelete(actualOldValues);
}
else {
throw new DmlParserException("Unexpected DML operation not supported");
}
}
catch (Throwable e) {
throw new DmlParserException("Cannot parse DML: " + dmlContent, e);
}
}
private static Object[] getColumnValueArray(Map<String, LogMinerColumnValueWrapper> map, boolean processed) {
// Internally this parser still uses the LogMinerColumnValue and LogMinerColumnValueWrapper classes as
// intermediate storage objects. Since this parser will be removed in the near future, it seemed
// appropriate to only bridge the changes when creating the LogMinerDmlEntry rather than adjust the
// inner workings of this parser entirely.
Stream<LogMinerColumnValueWrapper> stream = map.values().stream();
if (processed) {
stream = stream.filter(LogMinerColumnValueWrapper::isProcessed);
}
return stream.map(LogMinerColumnValueWrapper::getColumnValue)
.map(LogMinerColumnValue::getColumnData)
.toArray();
}
private void initColumns(Table table, String tableName) {
if (!table.id().table().equals(tableName)) {
throw new ParsingException(null, "Resolved TableId expected table name '" + table.id().table() + "' but is '" + tableName + "'");
}
this.table = table;
for (int i = 0; i < table.columns().size(); i++) {
Column column = table.columns().get(i);
String key = column.name();
String name = ParserUtils.stripeQuotes(column.name().toUpperCase());
newColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name)));
oldColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name)));
}
}
// this parses simple statement with only one table
private void parseUpdate(Table table, Update st) throws JSQLParserException {
int tableCount = st.getTables().size();
if (tableCount > 1 || tableCount == 0) {
throw new JSQLParserException("DML includes " + tableCount + " tables");
}
net.sf.jsqlparser.schema.Table parseTable = st.getTables().get(0);
initColumns(table, ParserUtils.stripeQuotes(parseTable.getName()));
List<net.sf.jsqlparser.schema.Column> columns = st.getColumns();
Alias alias = parseTable.getAlias();
aliasName = alias == null ? "" : alias.getName().trim();
List<Expression> expressions = st.getExpressions(); // new values
setNewValues(expressions, columns);
Expression where = st.getWhere(); // old values
if (where != null) {
parseWhereClause(where);
ParserUtils.cloneOldToNewColumnValues(newColumnValues, oldColumnValues, table);
}
else {
oldColumnValues.clear();
}
}
private void parseInsert(Table table, Insert st) {
initColumns(table, ParserUtils.stripeQuotes(st.getTable().getName()));
Alias alias = st.getTable().getAlias();
aliasName = alias == null ? "" : alias.getName().trim();
List<net.sf.jsqlparser.schema.Column> columns = st.getColumns();
ItemsList values = st.getItemsList();
values.accept(new ItemsListVisitorAdapter() {
@Override
public void visit(ExpressionList expressionList) {
super.visit(expressionList);
List<Expression> expressions = expressionList.getExpressions();
setNewValues(expressions, columns);
}
});
oldColumnValues.clear();
}
private void parseDelete(Table table, Delete st) {
initColumns(table, ParserUtils.stripeQuotes(st.getTable().getName()));
Alias alias = st.getTable().getAlias();
aliasName = alias == null ? "" : alias.getName().trim();
newColumnValues.clear();
Expression where = st.getWhere();
if (where != null) {
parseWhereClause(where);
}
else {
oldColumnValues.clear();
}
}
private void setNewValues(List<Expression> expressions, List<net.sf.jsqlparser.schema.Column> columns) {
if (expressions.size() != columns.size()) {
throw new RuntimeException("DML has " + expressions.size() + " column values, but Table object has " + columns.size() + " columns");
}
for (int i = 0; i < columns.size(); i++) {
String columnName = ParserUtils.stripeQuotes(columns.get(i).getColumnName().toUpperCase());
String value = ParserUtils.stripeQuotes(expressions.get(i).toString());
Object stripedValue = ParserUtils.removeApostrophes(value);
Column column = table.columnWithName(columnName);
if (column == null) {
LOGGER.trace("excluded column: {}", columnName);
continue;
}
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);
LogMinerColumnValueWrapper logMinerColumnValueWrapper = newColumnValues.get(columnName);
if (logMinerColumnValueWrapper != null) {
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);
}
}
}
private void parseWhereClause(Expression logicalExpression) {
logicalExpression.accept(new ExpressionVisitorAdapter() {
@Override
public void visit(EqualsTo expr) {
super.visit(expr);
String columnName = expr.getLeftExpression().toString();
columnName = ParserUtils.stripeAlias(columnName, aliasName);
String value = expr.getRightExpression().toString();
columnName = ParserUtils.stripeQuotes(columnName);
Column column = table.columnWithName(columnName);
if (column == null) {
LOGGER.trace("excluded column in where clause: {}", columnName);
return;
}
value = ParserUtils.removeApostrophes(value);
LogMinerColumnValueWrapper logMinerColumnValueWrapper = oldColumnValues.get(columnName.toUpperCase());
if (logMinerColumnValueWrapper != null) {
Object valueObject = ParserUtils.convertValueToSchemaType(column, value, converter);
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);
}
}
@Override
public void visit(IsNullExpression expr) {
super.visit(expr);
String columnName = expr.getLeftExpression().toString();
columnName = ParserUtils.stripeAlias(columnName, aliasName);
columnName = ParserUtils.stripeQuotes(columnName);
Column column = table.columnWithName(columnName);
if (column == null) {
LOGGER.trace("excluded column in where clause: {}", columnName);
return;
}
LogMinerColumnValueWrapper logMinerColumnValueWrapper = oldColumnValues.get(columnName.toUpperCase());
if (logMinerColumnValueWrapper != null) {
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(null);
}
}
});
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.valueholder;
/**
* @deprecated This has been deprecated and should no longer be used.
* This will be removed in conjunction with {@link io.debezium.connector.oracle.logminer.parser.SimpleDmlParser}.
*/
@Deprecated
public interface LogMinerColumnValue {
/**
* @return value of the database record
* with exception of LOB types
*/
Object getColumnData();
/**
* @return column name
*/
String getColumnName();
/**
* This sets the database record value with the exception of LOBs
* @param columnData data
*/
void setColumnData(Object columnData);
}

View File

@ -1,74 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.valueholder;
import java.util.Objects;
import io.debezium.connector.oracle.antlr.listener.ParserUtils;
/**
* This class stores parsed column info
*
* @deprecated This has been deprecated and should no longer be used.
* This will be removed in conjunction with {@link io.debezium.connector.oracle.logminer.parser.SimpleDmlParser}.
*/
@Deprecated
public class LogMinerColumnValueImpl implements LogMinerColumnValue {
private String columnName;
private Object columnData;
public LogMinerColumnValueImpl(String columnName) {
this.columnName = columnName;
}
@Override
public Object getColumnData() {
return columnData;
}
@Override
public String getColumnName() {
return columnName;
}
@Override
public void setColumnData(Object columnData) {
if (columnData instanceof String) {
this.columnData = ParserUtils.replaceDoubleBackSlashes((String) columnData);
}
else {
this.columnData = columnData;
}
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogMinerColumnValueImpl that = (LogMinerColumnValueImpl) o;
return Objects.equals(columnName, that.columnName) &&
Objects.equals(columnData, that.columnData);
}
@Override
public int hashCode() {
return Objects.hash(columnName, columnData);
}
@Override
public String toString() {
return "LogMinerColumnValueImpl{columnName=" + columnName + ", columnData=" + columnData + "}";
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.valueholder;
/**
* This class is a wrapper class which holds LogMinerColumnValue
* and the indicator if the column was processed by a parser listener.
* The "processed" is "true" means a listener has parsed a value.
* The "false" value means the this value was not parsed yet
* The "processed" flag helps to filter the resulting collection of "new" and "old" values.
*
* @deprecated This has been deprecated and should no longer be used.
* This will be removed in conjunction with {@link io.debezium.connector.oracle.logminer.parser.SimpleDmlParser}.
*/
@Deprecated
public class LogMinerColumnValueWrapper {
private boolean processed;
private final LogMinerColumnValue columnValue;
public LogMinerColumnValueWrapper(LogMinerColumnValue columnValue) {
this.columnValue = columnValue;
}
public LogMinerColumnValue getColumnValue() {
return columnValue;
}
public boolean isProcessed() {
return processed;
}
public void setProcessed(boolean processed) {
this.processed = processed;
}
}

View File

@ -1,491 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Struct;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
import io.debezium.connector.oracle.logminer.parser.DmlParser;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.IoUtil;
import net.sf.jsqlparser.statement.update.Update;
/**
* This is the test suite for Oracle Antlr and jsqlparser DML parser unit testing
*/
@SkipWhenAdapterNameIsNot(value = AdapterName.LOGMINER)
public class OracleDmlParserTest {
private OracleDdlParser ddlParser;
private SimpleDmlParser sqlDmlParser;
private Tables tables;
private static final String TABLE_NAME = "TEST";
private static final String CATALOG_NAME = "ORCLPDB1";
private static final String SCHEMA_NAME = "DEBEZIUM";
private static final String FULL_TABLE_NAME = SCHEMA_NAME + "\".\"" + TABLE_NAME;
private static final TableId TABLE_ID = TableId.parse(CATALOG_NAME + "." + SCHEMA_NAME + "." + TABLE_NAME);
private static final String SPATIAL_DATA = "SDO_GEOMETRY(2003, NULL, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY" +
"(102604.878, 85772.8286, 101994.879, 85773.6633, 101992.739, 84209.6648, 102602.738, 84208.83, 102604.878, 85772.8286))";
private static final String SPATIAL_DATA_1 = "'unsupported type'";
private static String CLOB_DATA;
private static byte[] BLOB_DATA; // todo
@Rule
public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();
@Before
public void setUp() {
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
ddlParser = new OracleDdlParser();
ddlParser.setCurrentSchema(SCHEMA_NAME);
ddlParser.setCurrentDatabase(CATALOG_NAME);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
tables = new Tables();
CLOB_DATA = StringUtils.repeat("clob_", 4000);
String blobString = "blob_";
BLOB_DATA = Arrays.copyOf(blobString.getBytes(), 8000); // todo doesn't support blob
}
@Test
public void shouldParseAliasUpdate() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "update \"" + FULL_TABLE_NAME + "\" a set a.\"col1\" = '9', a.col2 = 'diFFerent', a.col3 = 'anotheR', a.col4 = '123', a.col6 = 5.2, " +
"a.col8 = TO_TIMESTAMP('2019-05-14 02:28:32.302000'), a.col10 = " + CLOB_DATA + ", a.col11 = null, a.col12 = '1', " +
"a.col7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'), " +
"a.col13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS') " +
"where a.ID = 5 and a.COL1 = 6 and a.\"COL2\" = 'text' " +
"and a.COL3 = 'text' and a.COL4 IS NULL and a.\"COL5\" IS NULL and a.COL6 IS NULL " +
"and a.COL8 = TO_TIMESTAMP('2019-05-14 02:28:32.') and a.col11 is null;";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
verifyUpdate(record, false, true, 14);
}
@Test
public void shouldParseTimestampFormats() throws Exception {
Locale defaultLocale = Locale.getDefault();
try {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String format1 = "TO_TIMESTAMP('2020-09-22 00:09:37.302000')";
String format2 = "TO_TIMESTAMP('2020-09-22 00:09:37.')";
String format3 = "TO_TIMESTAMP('2020-09-22 00:09:37')";
String format4 = "TO_TIMESTAMP('22-SEP-20 12.09.37 AM')";
String format5 = "TO_TIMESTAMP('22-SEP-20 12.09.37 PM')";
String format6 = "TO_TIMESTAMP('29-SEP-20 06.02.24.777000 PM')";
String format7 = "TO_TIMESTAMP('2020-09-22 00:09:37.0')";
parseTimestamp(format1, false);
parseTimestamp(format2, true);
parseTimestamp(format3, true);
// Change to locale that does not recognize SEP in pattern MMM
Locale.setDefault(Locale.GERMAN);
parseTimestamp(format4, true);
parseTimestamp(format5, false);
parseTimestamp(format6, false);
Locale.setDefault(defaultLocale);
parseTimestamp(format7, true);
}
finally {
Locale.setDefault(defaultLocale);
}
}
@Test
@FixFor("DBZ-2784")
public void shouldParseDateFormats() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String format1 = "TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS')";
parseDate(format1, true);
}
private void parseDate(String format, boolean validateDate) {
String dml = "update \"" + FULL_TABLE_NAME + "\" a set a.\"col7\" = " + format + ", a.\"col13\" = " + format + " where a.ID = 1;";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
assertThat(record).isNotNull();
assertThat(record.getNewValues()).isNotEmpty();
assertThat(record.getOldValues()).isNotEmpty();
if (validateDate) {
assertThat(record.getNewValues()[7]).isEqualTo(1519257600000L);
assertThat(record.getNewValues()[13]).isEqualTo(1519257600000L);
}
}
private void parseTimestamp(String format, boolean validateTimestamp) {
String dml = "update \"" + FULL_TABLE_NAME + "\" a set a.\"col1\" = '9', a.col2 = 'diFFerent', a.col3 = 'anotheR', a.col4 = '123', a.col6 = 5.2, " +
"a.col8 = " + format + ", a.col10 = " + CLOB_DATA + ", a.col11 = null, a.col12 = '1' " +
"where a.ID = 5 and a.COL1 = 6 and a.\"COL2\" = 'text' " +
"and a.COL3 = 'text' and a.COL4 IS NULL and a.\"COL5\" IS NULL and a.COL6 IS NULL " +
"and a.COL8 = " + format + " and a.col11 is null;";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
assertThat(record.getNewValues()).isNotEmpty();
assertThat(record.getOldValues()).isNotEmpty();
if (validateTimestamp) {
assertThat(record.getNewValues()[8]).isEqualTo(1600733377000000L);
}
}
@Test
public void shouldParseAliasInsert() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "insert into \"" + FULL_TABLE_NAME + "\" a (a.\"ID\",a.\"COL1\",a.\"COL2\",a.\"COL3\",a.\"COL4\",a.\"COL5\",a.\"COL6\",a.\"COL8\"," +
"a.\"COL9\",a.\"COL10\",a.\"COL13\") values ('5','4','tExt','text',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB(),TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'));";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
verifyInsert(record);
}
@Test
public void shouldParseAliasDelete() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "delete from \"" + FULL_TABLE_NAME +
"\" a where a.\"id\" = 6 and a.\"col1\" = 2 and a.\"col2\" = 'text' and a.col3 = 'tExt' and a.col4 is null and a.col5 is null " +
" and a.col6 is null and a.col8 is null and a.col9 is null and a.col10 is null and a.col11 is null and a.col12 is null";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
verifyDelete(record, true);
}
@Test
public void shouldParseNoWhereClause() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "update \"" + FULL_TABLE_NAME
+ "\" a set a.\"id\"=1, a.\"col1\" = '9', a.col2 = 'diFFerent', a.col3 = 'anotheR', a.col4 = '123', a.col5 = null, a.col6 = 5.2, " +
"a.col8 = TO_TIMESTAMP('2019-05-14 02:28:32.302000'), a.col9=null, a.col10 = " + CLOB_DATA + ", a.col11 = null, a.col12 = '1', " +
"a.col7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'), " +
"a.col13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS')";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
verifyUpdate(record, false, false, 9);
dml = "delete from \"" + FULL_TABLE_NAME + "\" a ";
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
verifyDelete(record, false);
}
@Test
public void shouldParseInsertAndDeleteTable() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "insert into \"" + FULL_TABLE_NAME + "\"(\"ID\",\"COL1\",\"COL2\",\"COL3\",\"COL4\",\"COL5\",\"COL6\",\"COL8\"," +
"\"COL9\",\"COL10\") values ('5','4','tExt','text',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB());";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
verifyInsert(record);
dml = "delete from \"" + FULL_TABLE_NAME +
"\" where id = 6 and col1 = 2 and col2 = 'text' and col3 = 'tExt' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
verifyDelete(record, true);
}
// todo encrypted columns and spatial will be represented as "Unsupported Type"
@Test
public void shouldParseUpdateTable() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "update \"" + FULL_TABLE_NAME + "\" set \"col1\" = '9', col2 = 'diFFerent', col3 = 'anotheR', col4 = '123', col6 = '5.2', " +
"col8 = TO_TIMESTAMP('2019-05-14 02:28:32.302000'), col10='clob_', col12 = '1' " +
"where ID = 5 and COL1 = 6 and \"COL2\" = 'text' " +
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL AND COL7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS') " +
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + " and COL13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS');";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
verifyUpdate(record, true, true, 14);
dml = "update \"" + FULL_TABLE_NAME
+ "\" set \"col1\" = '9', col2 = '$2a$10$aHo.lQk.YAkGl5AkXbjJhODBqwNLkqF94slP5oZ3boNzm0d04WnE2', col3 = NULL, col4 = '123', col6 = '5.2', " +
"col8 = TO_TIMESTAMP('2019-05-14 02:28:32.302000'), col10='clob_', col12 = '1' " +
"where ID = 5 and COL1 = 6 and \"COL2\" = 'johan.philtjens@dpworld.com' " +
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL " +
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";";
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
}
@Test
public void shouldParseUpdateNoChangesTable() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "update \"" + FULL_TABLE_NAME + "\" set \"col1\" = '6', col2 = 'text', col3 = 'text', col4 = NULL " +
"where ID = 5 and COL1 = 6 and \"COL2\" = 'text' " +
"and COL3 = Unsupported Type and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL and COL7 IS NULL and COL9 IS NULL and COL10 IS NULL and COL12 IS NULL "
+
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";";
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
boolean pass = record.getOperation() == RowMapper.UPDATE
&& record.getOldValues().length == record.getNewValues().length
&& Objects.equals(record.getNewValues(), record.getOldValues());
assertThat(pass);
assertThat(record.getOldValues()[4]).isNull();
}
@Test
public void shouldParseSpecialCharacters() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "insert into \"" + FULL_TABLE_NAME + "\"(\"ID\",\"COL1\",\"COL2\",\"COL3\",\"COL4\",\"COL5\",\"COL6\",\"COL8\"," +
"\"COL9\",\"COL10\") values ('5','4','\\','\\test',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB());";
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
assertThat(result).isNotNull();
assertThat(result.getNewValues()[2].toString().contains("\\"));
dml = "delete from \"" + FULL_TABLE_NAME +
"\" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
assertThat(result).isNotNull();
assertThat(result.getOldValues()[3].toString().contains("\\"));
}
@Test
public void shouldParseStrangeDml() throws Exception {
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = null;
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
assertThat(result).isNull();
dml = "select * from test;null;";
assertDmlParserException(dml, sqlDmlParser, tables.forTable(TABLE_ID), "");
assertThat(result).isNull();
dml = "full dummy mess";
assertDmlParserException(dml, sqlDmlParser, tables.forTable(TABLE_ID), "");
dml = "delete from non_exiting_table " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
assertDmlParserException(dml, sqlDmlParser, tables.forTable(TABLE_ID), "");
Update update = mock(Update.class);
Mockito.when(update.getTables()).thenReturn(new ArrayList<>());
dml = "update \"" + FULL_TABLE_NAME + "\" set col1 = 3 " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null and col20 is null";
result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
assertThat(result.getOldValues()).hasSize(14);
dml = "update \"" + FULL_TABLE_NAME + "\" set col1 = 3 " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col30 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col21 is null";
result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
assertThat(result.getNewValues()).hasSize(14);
dml = "update table1, \"" + FULL_TABLE_NAME + "\" set col1 = 3 " +
" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null and col20 is null";
assertDmlParserException(dml, sqlDmlParser, tables.forTable(TABLE_ID), "");
}
private void assertDmlParserException(String sql, DmlParser parser, Table table, String txId) {
try {
LogMinerDmlEntry dml = parser.parse(sql, table, txId);
}
catch (Exception e) {
assertThat(e).isInstanceOf(DmlParserException.class);
}
}
private void verifyUpdate(LogMinerDmlEntry record, boolean checkGeometry, boolean checkOldValues, int oldValuesNumber) {
// validate
assertThat(record.getOperation()).isEqualTo(RowMapper.UPDATE);
assertThat(record.getNewValues()).hasSize(14);
for (int i = 0; i < record.getNewValues().length; ++i) {
Object newValue = record.getNewValues()[i];
switch (i) {
case 1:
assertThat(newValue).isEqualTo(BigDecimal.valueOf(900, 2));
break;
case 2:
assertThat(newValue).isEqualTo("diFFerent");
break;
case 3:
assertThat(newValue).isEqualTo("anotheR");
break;
case 4:
assertThat(newValue).isEqualTo("123");
break;
case 6:
assertThat(((Struct) newValue).get("scale")).isEqualTo(1);
assertThat(((byte[]) ((Struct) newValue).get("value"))[0]).isEqualTo((byte) 52);
break;
case 7:
case 13:
assertThat(newValue).isInstanceOf(Long.class);
assertThat(newValue).isEqualTo(1519257600000L);
break;
case 8:
assertThat(newValue).isInstanceOf(Long.class);
assertThat(newValue).isEqualTo(1557800912302000L);
break;
case 10:
assertThat(newValue).isInstanceOf(String.class);
assertThat(newValue.toString().contains("clob_")).isTrue();
break;
case 11:
if (checkGeometry) {
assertThat(newValue).isInstanceOf(String.class);
assertThat(newValue.toString().contains("SDO_GEOMETRY")).isTrue();
}
else {
assertThat(newValue).isNull();
}
break;
case 12:
assertThat(newValue).isInstanceOf(Byte.class);
assertThat(newValue).isEqualTo(Byte.valueOf("1"));
break;
}
}
if (!checkOldValues) {
assertThat(record.getOldValues()).hasSize(0);
}
else {
assertThat(record.getOldValues()).hasSize(oldValuesNumber);
for (int i = 0; i < record.getOldValues().length; ++i) {
Object oldValue = record.getOldValues()[i];
switch (i) {
case 1:
assertThat(oldValue).isEqualTo(BigDecimal.valueOf(600, 2));
break;
case 2:
assertThat(oldValue).isEqualTo("text");
break;
case 3:
assertThat(oldValue).isEqualTo("text");
break;
case 4:
assertThat(oldValue).isNull();
break;
case 5:
assertThat(oldValue).isNull();
break;
case 6:
assertThat(oldValue).isNull();
break;
case 8:
assertThat(oldValue).isInstanceOf(Long.class);
assertThat(oldValue).isEqualTo(1557800912000000L);
break;
case 11:
if (checkGeometry) {
assertThat(oldValue).isInstanceOf(String.class);
assertThat(oldValue.toString().contains("SDO_GEOMETRY")).isTrue();
}
else {
assertThat(oldValue).isNull();
}
break;
case 0:
assertThat(oldValue).isEqualTo(new BigDecimal(5));
break;
}
}
}
}
private void verifyInsert(LogMinerDmlEntry record) {
assertThat(record.getOldValues()).hasSize(0);
assertThat(record.getOperation()).isEqualTo(RowMapper.INSERT);
assertThat(record.getNewValues()).hasSize(14);
assertThat(record.getNewValues()[0]).isEqualTo(new BigDecimal(5));
assertThat(record.getNewValues()[1]).isEqualTo(BigDecimal.valueOf(400, 2));
assertThat(record.getNewValues()[2]).isEqualTo("tExt");
assertThat(record.getNewValues()[3]).isEqualTo("text");
assertThat(record.getNewValues()[4]).isNull();
assertThat(record.getNewValues()[5]).isNull();
assertThat(record.getNewValues()[6]).isNull();
assertThat(record.getNewValues()[7]).isNull();
// todo handle LOBS
// assertThat(iterator.next().getColumnData()).isNull();
// assertThat(iterator.next().getColumnData()).isNull();
}
private void verifyDelete(LogMinerDmlEntry record, boolean checkOldValues) {
assertThat(record.getOperation()).isEqualTo(RowMapper.DELETE);
assertThat(record.getNewValues()).hasSize(0);
if (!checkOldValues) {
assertThat(record.getOldValues()).hasSize(0);
}
else {
assertThat(record.getOldValues()).hasSize(14);
assertThat(record.getOldValues()[0]).isEqualTo(new BigDecimal(6));
assertThat(record.getOldValues()[1]).isEqualTo(BigDecimal.valueOf(200, 2));
assertThat(record.getOldValues()[2]).isEqualTo("text");
assertThat(record.getOldValues()[3]).isEqualTo("tExt");
assertThat(record.getOldValues()[4]).isNull();
assertThat(record.getOldValues()[5]).isNull();
assertThat(record.getOldValues()[6]).isNull();
assertThat(record.getOldValues()[7]).isNull();
assertThat(record.getOldValues()[8]).isNull();
assertThat(record.getOldValues()[9]).isNull();
assertThat(record.getOldValues()[10]).isNull();
}
}
}

View File

@ -1,82 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
import java.math.BigInteger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.IoUtil;
@SkipWhenAdapterNameIsNot(value = AdapterName.LOGMINER)
public class ValueHolderTest {
private static final Scn SCN_ONE = new Scn(BigInteger.ONE);
private static final String TABLE_NAME = "TEST";
private static final String CATALOG_NAME = "CATALOG";
private static final String SCHEMA_NAME = "DEBEZIUM";
private OracleDdlParser ddlParser;
private SimpleDmlParser sqlDmlParser;
private Tables tables;
private static final String FULL_TABLE_NAME = SCHEMA_NAME + "\".\"" + TABLE_NAME;
private static final TableId TABLE_ID = TableId.parse(CATALOG_NAME + "." + SCHEMA_NAME + "." + TABLE_NAME);
@Rule
public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();
@Before
public void setUp() {
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
ddlParser = new OracleDdlParser();
ddlParser.setCurrentSchema(SCHEMA_NAME);
ddlParser.setCurrentDatabase(CATALOG_NAME);
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
tables = new Tables();
}
@Test
public void testValueHolders() throws Exception {
final Object[] newValues = new Object[2];
newValues[0] = new BigDecimal(5);
newValues[1] = "Text";
LogMinerDmlEntry dmlEntryExpected = LogMinerDmlEntryImpl.forInsert(newValues);
dmlEntryExpected.setObjectName(TABLE_NAME);
dmlEntryExpected.setObjectOwner(SCHEMA_NAME);
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_small_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
String dml = "insert into \"" + FULL_TABLE_NAME + "\" (\"column1\",\"column2\") values ('5','Text');";
LogMinerDmlEntry dmlEntryParsed = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
assertThat(dmlEntryParsed.equals(dmlEntryExpected)).isTrue();
assertThat(dmlEntryExpected.getOperation()).isEqualTo(RowMapper.INSERT);
assertThat(dmlEntryExpected.getObjectOwner().equals(SCHEMA_NAME)).isTrue();
assertThat(dmlEntryExpected.getObjectName().equals(TABLE_NAME)).isTrue();
assertThat(dmlEntryExpected.equals(null)).isFalse();
assertThat(dmlEntryExpected.equals(dmlEntryExpected)).isTrue();
}
}