DBZ-3400 Remove Oracle Antlr DML parser
This commit is contained in:
parent
d4a72d779d
commit
8f953dc9fe
@ -1,95 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr;
|
|
||||||
|
|
||||||
import org.antlr.v4.runtime.CharStream;
|
|
||||||
import org.antlr.v4.runtime.CommonTokenStream;
|
|
||||||
import org.antlr.v4.runtime.tree.ParseTree;
|
|
||||||
|
|
||||||
import io.debezium.antlr.AntlrDdlParser;
|
|
||||||
import io.debezium.antlr.AntlrDdlParserListener;
|
|
||||||
import io.debezium.antlr.DataTypeResolver;
|
|
||||||
import io.debezium.connector.oracle.OracleValueConverters;
|
|
||||||
import io.debezium.connector.oracle.antlr.listener.OracleDmlParserListener;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlLexer;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
|
|
||||||
import io.debezium.relational.SystemVariables;
|
|
||||||
import io.debezium.relational.Tables;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the main Oracle Antlr DML parser
|
|
||||||
*/
|
|
||||||
public class OracleDmlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> {
|
|
||||||
|
|
||||||
protected final String catalogName;
|
|
||||||
protected final String schemaName;
|
|
||||||
private final OracleValueConverters converter;
|
|
||||||
private LogMinerDmlEntry dmlEntry;
|
|
||||||
|
|
||||||
public OracleDmlParser(boolean throwErrorsFromTreeWalk, final String catalogName, final String schemaName, OracleValueConverters converter) {
|
|
||||||
super(throwErrorsFromTreeWalk);
|
|
||||||
this.catalogName = catalogName;
|
|
||||||
this.schemaName = schemaName;
|
|
||||||
this.converter = converter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public LogMinerDmlEntry getDmlEntry() {
|
|
||||||
return dmlEntry;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDmlEntry(LogMinerDmlEntry dml) {
|
|
||||||
this.dmlEntry = dml;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void parse(String dmlContent, Tables databaseTables) {
|
|
||||||
if (!dmlContent.endsWith(";")) {
|
|
||||||
dmlContent = dmlContent + ";";
|
|
||||||
}
|
|
||||||
// DML content is case sensitive
|
|
||||||
super.parse(dmlContent, databaseTables);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ParseTree parseTree(PlSqlParser parser) {
|
|
||||||
return parser.unit_statement();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected AntlrDdlParserListener createParseTreeWalkerListener() {
|
|
||||||
return new OracleDmlParserListener(catalogName, schemaName, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected PlSqlLexer createNewLexerInstance(CharStream charStreams) {
|
|
||||||
return new PlSqlLexer(charStreams);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected PlSqlParser createNewParserInstance(CommonTokenStream commonTokenStream) {
|
|
||||||
return new PlSqlParser(commonTokenStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean isGrammarInUpperCase() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected DataTypeResolver initializeDataTypeResolver() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected SystemVariables createNewSystemVariablesInstance() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OracleValueConverters getConverters() {
|
|
||||||
return converter;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,68 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr.listener;
|
|
||||||
|
|
||||||
import static io.debezium.connector.oracle.antlr.listener.ParserUtils.getTableName;
|
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import io.debezium.connector.oracle.OracleValueConverters;
|
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
|
|
||||||
import io.debezium.relational.Column;
|
|
||||||
import io.debezium.relational.Table;
|
|
||||||
import io.debezium.text.ParsingException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class contains common methods for DML parser listeners
|
|
||||||
*/
|
|
||||||
abstract class BaseDmlParserListener<T> extends PlSqlParserBaseListener {
|
|
||||||
|
|
||||||
protected String catalogName;
|
|
||||||
protected String schemaName;
|
|
||||||
protected Table table;
|
|
||||||
final OracleValueConverters converter;
|
|
||||||
String alias;
|
|
||||||
|
|
||||||
protected OracleDmlParser parser;
|
|
||||||
|
|
||||||
Map<T, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
|
|
||||||
Map<T, LogMinerColumnValueWrapper> oldColumnValues = new LinkedHashMap<>();
|
|
||||||
|
|
||||||
BaseDmlParserListener(String catalogName, String schemaName, OracleDmlParser parser) {
|
|
||||||
this.parser = parser;
|
|
||||||
this.catalogName = catalogName;
|
|
||||||
this.schemaName = schemaName;
|
|
||||||
this.converter = parser.getConverters();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Defines the key of the Map of LogMinerColumnValueWrapper. It could be String or Integer
|
|
||||||
abstract protected T getKey(Column column, int index);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method prepares all column value placeholders, based on the table metadata
|
|
||||||
* @param ctx DML table expression context
|
|
||||||
*/
|
|
||||||
void init(PlSqlParser.Dml_table_expression_clauseContext ctx) {
|
|
||||||
String tableName = getTableName(ctx.tableview_name());
|
|
||||||
table = parser.databaseTables().forTable(catalogName, schemaName, tableName);
|
|
||||||
if (table == null) {
|
|
||||||
throw new ParsingException(null, "Trying to parse a table, which does not exist.");
|
|
||||||
}
|
|
||||||
for (int i = 0; i < table.columns().size(); i++) {
|
|
||||||
Column column = table.columns().get(i);
|
|
||||||
int type = column.jdbcType();
|
|
||||||
T key = getKey(column, i);
|
|
||||||
String name = ParserUtils.stripeQuotes(column.name().toUpperCase());
|
|
||||||
newColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name, type)));
|
|
||||||
oldColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name, type)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,73 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr.listener;
|
|
||||||
|
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
|
|
||||||
import io.debezium.relational.Column;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class parses recursively logical expression tree for DELETE and UPDATE statements
|
|
||||||
*/
|
|
||||||
abstract class BaseDmlStringParserListener extends BaseDmlParserListener<String> {
|
|
||||||
|
|
||||||
boolean isUpdate;
|
|
||||||
|
|
||||||
BaseDmlStringParserListener(String catalogName, String schemaName, OracleDmlParser parser) {
|
|
||||||
super(catalogName, schemaName, parser);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterTable_alias(PlSqlParser.Table_aliasContext ctx) {
|
|
||||||
alias = ctx.getText().toUpperCase();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Logical expressions are trees and (column name, value) pairs are nested in this tree.
|
|
||||||
* This methods extracts those pairs and store them in List<LogMinerColumnValue> oldValues
|
|
||||||
* This method is used by VALUES parsers of update and delete statements.
|
|
||||||
* @param logicalExpression expression tree
|
|
||||||
*/
|
|
||||||
void parseRecursively(PlSqlParser.Logical_expressionContext logicalExpression) {
|
|
||||||
|
|
||||||
int count = logicalExpression.logical_expression().size();
|
|
||||||
if (count == 0) {
|
|
||||||
|
|
||||||
String nullValue = logicalExpression.getStop().getText();
|
|
||||||
|
|
||||||
String expression = logicalExpression.getText();
|
|
||||||
String columnName = "";
|
|
||||||
String value = "";
|
|
||||||
if (expression.contains("=")) {
|
|
||||||
columnName = expression.substring(0, expression.indexOf("=")).toUpperCase();
|
|
||||||
value = expression.substring(expression.indexOf("=") + 1);
|
|
||||||
}
|
|
||||||
if ("null".equalsIgnoreCase(nullValue)) {
|
|
||||||
columnName = expression.substring(0, expression.toUpperCase().indexOf("ISNULL")).toUpperCase();
|
|
||||||
value = nullValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
columnName = ParserUtils.stripeAlias(columnName, alias);
|
|
||||||
columnName = ParserUtils.stripeQuotes(columnName);
|
|
||||||
|
|
||||||
Column column = table.columnWithName(columnName);
|
|
||||||
Object stripedValue = ParserUtils.removeApostrophes(value);
|
|
||||||
|
|
||||||
LogMinerColumnValueWrapper logMinerColumnValueWrapper = oldColumnValues.get(columnName);
|
|
||||||
if (logMinerColumnValueWrapper != null) { // todo this used to happen for ROWID pseudo column. Test if this is not a problem after NO_ROWID_IN_STMT option
|
|
||||||
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);
|
|
||||||
logMinerColumnValueWrapper.setProcessed(true);
|
|
||||||
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
parseRecursively(logicalExpression.logical_expression(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,67 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr.listener;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
|
|
||||||
import io.debezium.data.Envelope;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
|
|
||||||
import io.debezium.relational.Column;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class parses delete statements.
|
|
||||||
* LogMiner instruments all the values in WHERE cause regardless of original statement.
|
|
||||||
* In other words if the original statement is
|
|
||||||
* delete from debezium where col1 = 2 and if there are 2 records to delete,
|
|
||||||
* LogMiner will contain following two statements:
|
|
||||||
*
|
|
||||||
* delete from "DEBEZIUM" where "ID" = 6 and "COL1" = 2 and "COL2" = 'text' and "COL3" = 'text' and "COL4" IS NULL and "COL5" IS NULL and "COL6" IS NULL and "COL7" IS NULL and "COL8" IS NULL
|
|
||||||
* delete from "DEBEZIUM" where "ID" = 7 and "COL1" = 2 and "COL2" = 'text' and "COL3" = 'text' and "COL4" IS NULL and "COL5" IS NULL and "COL6" IS NULL and "COL7" IS NULL and "COL8" IS NULL
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class DeleteParserListener extends BaseDmlStringParserListener {
|
|
||||||
|
|
||||||
DeleteParserListener(final String catalogName, final String schemaName, final OracleDmlParser parser) {
|
|
||||||
super(catalogName, schemaName, parser);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getKey(Column column, int index) {
|
|
||||||
return column.name();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterDelete_statement(PlSqlParser.Delete_statementContext ctx) {
|
|
||||||
init(ctx.general_table_ref().dml_table_expression_clause());
|
|
||||||
newColumnValues.clear();
|
|
||||||
PlSqlParser.Table_aliasContext tableAlias = ctx.general_table_ref().table_alias();
|
|
||||||
alias = tableAlias == null ? "" : tableAlias.getText().toUpperCase();
|
|
||||||
PlSqlParser.Where_clauseContext where = ctx.where_clause();
|
|
||||||
if (where != null) {
|
|
||||||
parseRecursively(ctx.where_clause().expression().logical_expression());
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
oldColumnValues.clear();
|
|
||||||
}
|
|
||||||
super.enterDelete_statement(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exitDelete_statement(PlSqlParser.Delete_statementContext ctx) {
|
|
||||||
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values()
|
|
||||||
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
|
|
||||||
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues);
|
|
||||||
parser.setDmlEntry(newRecord);
|
|
||||||
super.exitDelete_statement(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,87 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr.listener;
|
|
||||||
|
|
||||||
import static io.debezium.antlr.AntlrDdlParser.getText;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
|
|
||||||
import io.debezium.data.Envelope;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
|
|
||||||
import io.debezium.relational.Column;
|
|
||||||
import io.debezium.text.ParsingException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class parses Oracle INSERT statements.
|
|
||||||
* if the original tested query was: insert into DEBEZIUM (id,col3) values (2, 'some text')
|
|
||||||
*
|
|
||||||
* LogMiner will supply:
|
|
||||||
*
|
|
||||||
* insert into "DEBEZIUM"("ID","COL1","COL2","COL3","COL4","COL5","COL6","COL7","COL8","COL9","COL10")
|
|
||||||
* values (2,NULL,'debezium','some text',NULL,NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB())
|
|
||||||
* update "DEBEZIUM" set "COL9" = NULL, "COL10" = NULL where "ID" = 2 and "COL1" IS NULL and "COL2" = 'debezium'
|
|
||||||
* and "COL3" = 'some text' and "COL4" IS NULL and "COL5" IS NULL and "COL6" IS NULL
|
|
||||||
* and "COL7" IS NULL and "COL8" IS NULL
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class InsertParserListener extends BaseDmlParserListener<Integer> {
|
|
||||||
|
|
||||||
InsertParserListener(String catalogName, String schemaName, OracleDmlParser parser) {
|
|
||||||
super(catalogName, schemaName, parser);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Integer getKey(Column column, int index) {
|
|
||||||
return index;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterInsert_statement(PlSqlParser.Insert_statementContext ctx) {
|
|
||||||
init(ctx.single_table_insert().insert_into_clause().general_table_ref().dml_table_expression_clause());
|
|
||||||
oldColumnValues.clear();
|
|
||||||
super.enterInsert_statement(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterValues_clause(PlSqlParser.Values_clauseContext ctx) {
|
|
||||||
if (table == null) {
|
|
||||||
throw new ParsingException(null, "Trying to parse a statement for a table which does not exist. " +
|
|
||||||
"Statement: " + getText(ctx));
|
|
||||||
}
|
|
||||||
|
|
||||||
List<PlSqlParser.ExpressionContext> values = ctx.expressions().expression();
|
|
||||||
for (int i = 0; i < values.size(); i++) {
|
|
||||||
PlSqlParser.ExpressionContext value = values.get(i);
|
|
||||||
LogMinerColumnValueWrapper columnObject = newColumnValues.get(i);
|
|
||||||
|
|
||||||
String columnName = columnObject.getColumnValue().getColumnName();
|
|
||||||
Column column = table.columnWithName(columnName);
|
|
||||||
|
|
||||||
String valueText = value.logical_expression().getText();
|
|
||||||
valueText = ParserUtils.removeApostrophes(valueText);
|
|
||||||
Object valueObject = ParserUtils.convertValueToSchemaType(column, valueText, converter);
|
|
||||||
|
|
||||||
columnObject.getColumnValue().setColumnData(valueObject);
|
|
||||||
}
|
|
||||||
super.enterValues_clause(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exitSingle_table_insert(PlSqlParser.Single_table_insertContext ctx) {
|
|
||||||
List<LogMinerColumnValue> actualNewValues = newColumnValues.values()
|
|
||||||
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
|
|
||||||
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList());
|
|
||||||
parser.setDmlEntry(newRecord);
|
|
||||||
super.exitSingle_table_insert(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,54 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr.listener;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
|
|
||||||
import org.antlr.v4.runtime.ParserRuleContext;
|
|
||||||
import org.antlr.v4.runtime.tree.ParseTreeListener;
|
|
||||||
|
|
||||||
import io.debezium.antlr.AntlrDdlParserListener;
|
|
||||||
import io.debezium.antlr.ProxyParseTreeListenerUtil;
|
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
|
|
||||||
import io.debezium.text.ParsingException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class is Oracle main DML parser listener class.
|
|
||||||
* It instantiates supported listeners, walks listeners through every parsing rule and collects parsing exceptions.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class OracleDmlParserListener extends PlSqlParserBaseListener implements AntlrDdlParserListener {
|
|
||||||
|
|
||||||
private final List<ParseTreeListener> listeners = new CopyOnWriteArrayList<>();
|
|
||||||
private final Collection<ParsingException> errors = new ArrayList<>();
|
|
||||||
|
|
||||||
public OracleDmlParserListener(final String catalogName, final String schemaName,
|
|
||||||
final OracleDmlParser parser) {
|
|
||||||
listeners.add(new InsertParserListener(catalogName, schemaName, parser));
|
|
||||||
listeners.add(new UpdateParserListener(catalogName, schemaName, parser));
|
|
||||||
listeners.add(new DeleteParserListener(catalogName, schemaName, parser));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<ParsingException> getErrors() {
|
|
||||||
return errors;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterEveryRule(ParserRuleContext ctx) {
|
|
||||||
ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exitEveryRule(ParserRuleContext ctx) {
|
|
||||||
ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,104 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Debezium Authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*/
|
|
||||||
package io.debezium.connector.oracle.antlr.listener;
|
|
||||||
|
|
||||||
import static io.debezium.antlr.AntlrDdlParser.getText;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
|
|
||||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
|
|
||||||
import io.debezium.data.Envelope;
|
|
||||||
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
|
|
||||||
import io.debezium.relational.Column;
|
|
||||||
import io.debezium.text.ParsingException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class parses UPDATE statements.
|
|
||||||
* For the original query:
|
|
||||||
* update debezium set test = '7' where test1 = '6' (let's assume we have 3 records with such value)
|
|
||||||
*
|
|
||||||
* LogMiner with supply:
|
|
||||||
*
|
|
||||||
* update "debezium" set "TEST" = '7' where "DUMMY" = '1' and "TEST" = '2' and "TEST1" = '6' and "TEST2" = '1'
|
|
||||||
* update "debezium" set "TEST" = '7' where "DUMMY" = '2' and "TEST" = '2' and "TEST1" = '6' and "TEST2" = '1'
|
|
||||||
* update "debezium" set "TEST" = '7' where "DUMMY" = '3' and "TEST" = '2' and "TEST1" = '6' and "TEST2" = '1'
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class UpdateParserListener extends BaseDmlStringParserListener {
|
|
||||||
|
|
||||||
UpdateParserListener(String catalogName, String schemaName, OracleDmlParser parser) {
|
|
||||||
super(catalogName, schemaName, parser);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getKey(Column column, int index) {
|
|
||||||
return column.name();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterUpdate_statement(PlSqlParser.Update_statementContext ctx) {
|
|
||||||
init(ctx.general_table_ref().dml_table_expression_clause());
|
|
||||||
isUpdate = true;
|
|
||||||
super.enterUpdate_statement(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* this method could be invoked by delete, insert or update statements, but we should act on update only
|
|
||||||
* @param ctx where clause context
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void enterWhere_clause(PlSqlParser.Where_clauseContext ctx) {
|
|
||||||
if (isUpdate) {
|
|
||||||
parseRecursively(ctx.expression().logical_expression());
|
|
||||||
ParserUtils.cloneOldToNewColumnValues(newColumnValues, oldColumnValues, table);
|
|
||||||
}
|
|
||||||
isUpdate = false;
|
|
||||||
super.enterWhere_clause(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void enterColumn_based_update_set_clause(PlSqlParser.Column_based_update_set_clauseContext ctx) {
|
|
||||||
if (table == null) {
|
|
||||||
throw new ParsingException(null, "Trying to parse a statement for a table which does not exist. " +
|
|
||||||
"Statement: " + getText(ctx));
|
|
||||||
}
|
|
||||||
String columnName = ctx.column_name().getText().toUpperCase();
|
|
||||||
String stripedName = ParserUtils.stripeAlias(columnName, alias);
|
|
||||||
stripedName = ParserUtils.stripeQuotes(stripedName);
|
|
||||||
String value = ctx.getText().substring(columnName.length() + 1);
|
|
||||||
String nullValue = ctx.expression().getStop().getText();
|
|
||||||
if ("null".equalsIgnoreCase(nullValue)) {
|
|
||||||
value = nullValue;
|
|
||||||
}
|
|
||||||
Object stripedValue = ParserUtils.removeApostrophes(value);
|
|
||||||
|
|
||||||
Column column = table.columnWithName(stripedName);
|
|
||||||
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);
|
|
||||||
|
|
||||||
LogMinerColumnValueWrapper logMinerColumnValueWrapper = newColumnValues.get(stripedName);
|
|
||||||
logMinerColumnValueWrapper.setProcessed(true);
|
|
||||||
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);
|
|
||||||
|
|
||||||
super.enterColumn_based_update_set_clause(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exitUpdate_statement(PlSqlParser.Update_statementContext ctx) {
|
|
||||||
List<LogMinerColumnValue> actualNewValues = newColumnValues.values().stream()
|
|
||||||
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
|
|
||||||
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values().stream()
|
|
||||||
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
|
|
||||||
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.UPDATE, actualNewValues, actualOldValues);
|
|
||||||
parser.setDmlEntry(newRecord);
|
|
||||||
super.exitUpdate_statement(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -26,7 +26,6 @@
|
|||||||
import io.debezium.connector.oracle.OracleConnectorConfig;
|
import io.debezium.connector.oracle.OracleConnectorConfig;
|
||||||
import io.debezium.connector.oracle.OracleValueConverters;
|
import io.debezium.connector.oracle.OracleValueConverters;
|
||||||
import io.debezium.connector.oracle.antlr.OracleDdlParser;
|
import io.debezium.connector.oracle.antlr.OracleDdlParser;
|
||||||
import io.debezium.connector.oracle.antlr.OracleDmlParser;
|
|
||||||
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
|
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
|
||||||
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
|
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
|
||||||
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
|
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
|
||||||
@ -52,7 +51,6 @@
|
|||||||
public class OracleDmlParserTest {
|
public class OracleDmlParserTest {
|
||||||
|
|
||||||
private OracleDdlParser ddlParser;
|
private OracleDdlParser ddlParser;
|
||||||
private OracleDmlParser antlrDmlParser;
|
|
||||||
private SimpleDmlParser sqlDmlParser;
|
private SimpleDmlParser sqlDmlParser;
|
||||||
private Tables tables;
|
private Tables tables;
|
||||||
private static final String TABLE_NAME = "TEST";
|
private static final String TABLE_NAME = "TEST";
|
||||||
@ -74,7 +72,6 @@ public void setUp() {
|
|||||||
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
|
OracleValueConverters converters = new OracleValueConverters(new OracleConnectorConfig(TestHelper.defaultConfig().build()), null);
|
||||||
|
|
||||||
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
|
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
|
||||||
antlrDmlParser = new OracleDmlParser(true, CATALOG_NAME, SCHEMA_NAME, converters);
|
|
||||||
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
|
sqlDmlParser = new SimpleDmlParser(CATALOG_NAME, converters);
|
||||||
tables = new Tables();
|
tables = new Tables();
|
||||||
|
|
||||||
@ -97,11 +94,7 @@ public void shouldParseAliasUpdate() throws Exception {
|
|||||||
"and a.COL3 = 'text' and a.COL4 IS NULL and a.\"COL5\" IS NULL and a.COL6 IS NULL " +
|
"and a.COL3 = 'text' and a.COL4 IS NULL and a.\"COL5\" IS NULL and a.COL6 IS NULL " +
|
||||||
"and a.COL8 = TO_TIMESTAMP('2019-05-14 02:28:32.') and a.col11 is null;";
|
"and a.COL8 = TO_TIMESTAMP('2019-05-14 02:28:32.') and a.col11 is null;";
|
||||||
|
|
||||||
antlrDmlParser.parse(dml, tables);
|
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyUpdate(record, false, true, 9);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
|
||||||
verifyUpdate(record, false, true, 9);
|
verifyUpdate(record, false, true, 9);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,11 +166,8 @@ public void shouldParseAliasInsert() throws Exception {
|
|||||||
|
|
||||||
String dml = "insert into \"" + FULL_TABLE_NAME + "\" a (a.\"ID\",a.\"COL1\",a.\"COL2\",a.\"COL3\",a.\"COL4\",a.\"COL5\",a.\"COL6\",a.\"COL8\"," +
|
String dml = "insert into \"" + FULL_TABLE_NAME + "\" a (a.\"ID\",a.\"COL1\",a.\"COL2\",a.\"COL3\",a.\"COL4\",a.\"COL5\",a.\"COL6\",a.\"COL8\"," +
|
||||||
"a.\"COL9\",a.\"COL10\",a.\"COL13\") values ('5','4','tExt','text',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB(),TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'));";
|
"a.\"COL9\",a.\"COL10\",a.\"COL13\") values ('5','4','tExt','text',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB(),TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'));";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyInsert(record);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
verifyInsert(record);
|
verifyInsert(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,11 +179,8 @@ public void shouldParseAliasDelete() throws Exception {
|
|||||||
String dml = "delete from \"" + FULL_TABLE_NAME +
|
String dml = "delete from \"" + FULL_TABLE_NAME +
|
||||||
"\" a where a.\"id\" = 6 and a.\"col1\" = 2 and a.\"col2\" = 'text' and a.col3 = 'tExt' and a.col4 is null and a.col5 is null " +
|
"\" a where a.\"id\" = 6 and a.\"col1\" = 2 and a.\"col2\" = 'text' and a.col3 = 'tExt' and a.col4 is null and a.col5 is null " +
|
||||||
" and a.col6 is null and a.col8 is null and a.col9 is null and a.col10 is null and a.col11 is null and a.col12 is null";
|
" and a.col6 is null and a.col8 is null and a.col9 is null and a.col10 is null and a.col11 is null and a.col12 is null";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyDelete(record, true);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
verifyDelete(record, true);
|
verifyDelete(record, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,17 +195,10 @@ public void shouldParseNoWhereClause() throws Exception {
|
|||||||
"a.col7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'), " +
|
"a.col7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS'), " +
|
||||||
"a.col13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS')";
|
"a.col13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS')";
|
||||||
|
|
||||||
antlrDmlParser.parse(dml, tables);
|
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyUpdate(record, false, false, 9);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
|
||||||
verifyUpdate(record, false, false, 9);
|
verifyUpdate(record, false, false, 9);
|
||||||
|
|
||||||
dml = "delete from \"" + FULL_TABLE_NAME + "\" a ";
|
dml = "delete from \"" + FULL_TABLE_NAME + "\" a ";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyDelete(record, false);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
verifyDelete(record, false);
|
verifyDelete(record, false);
|
||||||
@ -232,19 +212,13 @@ public void shouldParseInsertAndDeleteTable() throws Exception {
|
|||||||
|
|
||||||
String dml = "insert into \"" + FULL_TABLE_NAME + "\"(\"ID\",\"COL1\",\"COL2\",\"COL3\",\"COL4\",\"COL5\",\"COL6\",\"COL8\"," +
|
String dml = "insert into \"" + FULL_TABLE_NAME + "\"(\"ID\",\"COL1\",\"COL2\",\"COL3\",\"COL4\",\"COL5\",\"COL6\",\"COL8\"," +
|
||||||
"\"COL9\",\"COL10\") values ('5','4','tExt','text',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB());";
|
"\"COL9\",\"COL10\") values ('5','4','tExt','text',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB());";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyInsert(record);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
verifyInsert(record);
|
verifyInsert(record);
|
||||||
|
|
||||||
dml = "delete from \"" + FULL_TABLE_NAME +
|
dml = "delete from \"" + FULL_TABLE_NAME +
|
||||||
"\" where id = 6 and col1 = 2 and col2 = 'text' and col3 = 'tExt' and col4 is null and col5 is null " +
|
"\" where id = 6 and col1 = 2 and col2 = 'text' and col3 = 'tExt' and col4 is null and col5 is null " +
|
||||||
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
|
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
record = antlrDmlParser.getDmlEntry();
|
|
||||||
verifyDelete(record, true);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
|
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
|
||||||
verifyDelete(record, true);
|
verifyDelete(record, true);
|
||||||
@ -263,11 +237,7 @@ public void shouldParseUpdateTable() throws Exception {
|
|||||||
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL AND COL7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS') " +
|
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL AND COL7 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS') " +
|
||||||
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + " and COL13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS');";
|
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + " and COL13 = TO_DATE('2018-02-22 00:00:00', 'YYYY-MM-DD HH24:MI:SS');";
|
||||||
|
|
||||||
antlrDmlParser.parse(dml, tables);
|
LogMinerDmlEntry record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
|
||||||
LogMinerDmlEntry record = antlrDmlParser.getDmlEntry();
|
|
||||||
// verifyUpdate(record, true, true);
|
|
||||||
|
|
||||||
record = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
|
|
||||||
verifyUpdate(record, true, true, 11);
|
verifyUpdate(record, true, true, 11);
|
||||||
|
|
||||||
dml = "update \"" + FULL_TABLE_NAME
|
dml = "update \"" + FULL_TABLE_NAME
|
||||||
@ -308,8 +278,6 @@ public void shouldParseSpecialCharacters() throws Exception {
|
|||||||
|
|
||||||
String dml = "insert into \"" + FULL_TABLE_NAME + "\"(\"ID\",\"COL1\",\"COL2\",\"COL3\",\"COL4\",\"COL5\",\"COL6\",\"COL8\"," +
|
String dml = "insert into \"" + FULL_TABLE_NAME + "\"(\"ID\",\"COL1\",\"COL2\",\"COL3\",\"COL4\",\"COL5\",\"COL6\",\"COL8\"," +
|
||||||
"\"COL9\",\"COL10\") values ('5','4','\\','\\test',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB());";
|
"\"COL9\",\"COL10\") values ('5','4','\\','\\test',NULL,NULL,NULL,NULL,EMPTY_BLOB(),EMPTY_CLOB());";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
assertThat(antlrDmlParser.getDmlEntry()).isNotNull();
|
|
||||||
|
|
||||||
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
LogMinerDmlEntry result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "1");
|
||||||
assertThat(result).isNotNull();
|
assertThat(result).isNotNull();
|
||||||
@ -319,8 +287,6 @@ public void shouldParseSpecialCharacters() throws Exception {
|
|||||||
dml = "delete from \"" + FULL_TABLE_NAME +
|
dml = "delete from \"" + FULL_TABLE_NAME +
|
||||||
"\" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
|
"\" where id = 6 and col1 = 2 and col2 = 'te\\xt' and col3 = 'tExt\\' and col4 is null and col5 is null " +
|
||||||
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
|
" and col6 is null and col8 is null and col9 is null and col10 is null and col11 is null and col12 is null";
|
||||||
antlrDmlParser.parse(dml, tables);
|
|
||||||
assertThat(antlrDmlParser.getDmlEntry()).isNotNull();
|
|
||||||
|
|
||||||
result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
|
result = sqlDmlParser.parse(dml, tables.forTable(TABLE_ID), "");
|
||||||
assertThat(result).isNotNull();
|
assertThat(result).isNotNull();
|
||||||
|
Loading…
Reference in New Issue
Block a user