DBZ-3328 Avoid copying in Oracle LogMiner DML parser

This commit is contained in:
Chris Cranford 2021-05-21 14:44:19 -04:00 committed by Gunnar Morling
parent 3149a95112
commit 52906cf154
2 changed files with 104 additions and 163 deletions

View File

@ -7,9 +7,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.logminer.RowMapper;
@ -17,6 +16,7 @@
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
/**
@ -90,6 +90,22 @@ public LogMinerDmlEntry parse(String sql, Table table, String txId) {
throw new DmlParserException("Unknown supported SQL '" + sql + "'");
}
private void addMissingColumns(List<LogMinerColumnValue> columnValues, Table table) {
// Make sure that any column in the table schema but wasn't in the DDL is added
// but with a null value to indicate it wasn't provided.
for (Column column : table.columns()) {
boolean found = false;
for (LogMinerColumnValue columnValue : columnValues) {
if (columnValue.getColumnName().equals(column.name())) {
found = true;
}
}
if (!found) {
columnValues.add(new LogMinerColumnValueImpl(column.name()));
}
}
}
/**
* Parse an {@code INSERT} SQL statement.
*
@ -106,26 +122,13 @@ private LogMinerDmlEntry parseInsert(String sql, Table table) {
index = parseTableName(sql, index);
// capture column names
List<String> columnNames = new ArrayList<>();
index = parseColumnListClause(sql, index, columnNames);
List<LogMinerColumnValue> newValues = new ArrayList<>(table.columns().size());
index = parseColumnListClause(sql, index, newValues);
// capture values
List<String> columnValues = new ArrayList<>(columnNames.size());
index = parseColumnValuesClause(sql, index, columnValues);
if (columnNames.size() != columnValues.size()) {
throw new DmlParserException("Columns: " + columnNames + ", Values: " + columnValues);
}
final Map<String, String> columnMap = createMap(columnNames, columnValues);
final List<LogMinerColumnValue> newValues = new ArrayList<>(columnNames.size());
for (int i = 0; i < table.columns().size(); ++i) {
String columnName = table.columns().get(i).name();
String columnValue = columnMap.get(columnName);
newValues.add(createColumnValue(columnName, columnValue));
}
index = parseColumnValuesClause(sql, index, newValues);
addMissingColumns(newValues, table);
return new LogMinerDmlEntryImpl(RowMapper.INSERT, newValues, Collections.emptyList());
}
catch (Exception e) {
@ -133,14 +136,6 @@ private LogMinerDmlEntry parseInsert(String sql, Table table) {
}
}
private static <K, V> Map<K, V> createMap(List<K> keys, List<V> values) {
Map<K, V> result = new LinkedHashMap<>(keys.size());
for (int i = 0; i < keys.size(); ++i) {
result.put(keys.get(i), values.get(i));
}
return result;
}
/**
* Parse an {@code UPDATE} SQL statement.
*
@ -157,53 +152,32 @@ private LogMinerDmlEntry parseUpdate(String sql, Table table) {
index = parseTableName(sql, index);
// parse set
List<String> newColumnNames = new ArrayList<>();
List<String> newColumnValues = new ArrayList<>();
index = parseSetClause(sql, index, newColumnNames, newColumnValues);
List<LogMinerColumnValue> newValues = new ArrayList<>(table.columns().size());
index = parseSetClause(sql, index, newValues);
// parse where
List<String> oldColumnNames = new ArrayList<>();
List<String> oldColumnValues = new ArrayList<>();
parseWhereClause(sql, index, oldColumnNames, oldColumnValues);
final Map<String, String> beforeColumnMap = createMap(oldColumnNames, oldColumnValues);
final Map<String, String> afterColumnMap = createMap(newColumnNames, newColumnValues);
// set before
final List<LogMinerColumnValue> oldValues;
if (!beforeColumnMap.isEmpty()) {
oldValues = new ArrayList<>(table.columns().size());
for (int i = 0; i < table.columns().size(); ++i) {
String columnName = table.columns().get(i).name();
String columnValue = beforeColumnMap.get(columnName);
oldValues.add(createColumnValue(columnName, columnValue));
}
}
else {
oldValues = Collections.emptyList();
}
List<LogMinerColumnValue> oldValues = new ArrayList<>(table.columns().size());
parseWhereClause(sql, index, oldValues);
// set after
List<LogMinerColumnValue> newValues;
if (!afterColumnMap.isEmpty()) {
newValues = new ArrayList<>(table.columns().size());
if (!newValues.isEmpty()) {
for (int i = 0; i < table.columns().size(); ++i) {
String columnName = table.columns().get(i).name();
if (afterColumnMap.containsKey(columnName)) {
LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName);
value.setColumnData(afterColumnMap.get(columnName));
newValues.add(value);
}
else {
LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName);
value.setColumnData(beforeColumnMap.get(columnName));
final Column column = table.columns().get(i);
if (newValues.stream().noneMatch(c -> c.getColumnName().equals(column.name()))) {
LogMinerColumnValue value = new LogMinerColumnValueImpl(column.name());
Optional<LogMinerColumnValue> oldValue = oldValues.stream().filter(c -> c.getColumnName().equals(column.name())).findFirst();
oldValue.ifPresent(logMinerColumnValue -> {
value.setColumnData(logMinerColumnValue.getColumnData());
newValues.add(value);
});
}
}
}
else {
newValues = Collections.emptyList();
if (!oldValues.isEmpty()) {
addMissingColumns(oldValues, table);
}
addMissingColumns(newValues, table);
return new LogMinerDmlEntryImpl(RowMapper.UPDATE, newValues, oldValues);
}
@ -228,25 +202,12 @@ private LogMinerDmlEntry parseDelete(String sql, Table table) {
index = parseTableName(sql, index);
// parse where
List<String> columnNames = new ArrayList<>();
List<String> columnValues = new ArrayList<>();
parseWhereClause(sql, index, columnNames, columnValues);
List<LogMinerColumnValue> oldValues = new ArrayList<>(table.columns().size());
parseWhereClause(sql, index, oldValues);
final Map<String, String> beforeColumnMap = createMap(columnNames, columnValues);
List<LogMinerColumnValue> oldValues;
if (!beforeColumnMap.isEmpty()) {
oldValues = new ArrayList<>(columnNames.size());
for (int i = 0; i < table.columns().size(); ++i) {
String columnName = table.columns().get(i).name();
String columnValue = beforeColumnMap.get(columnName);
oldValues.add(createColumnValue(columnName, columnValue));
if (!oldValues.isEmpty()) {
addMissingColumns(oldValues, table);
}
}
else {
oldValues = Collections.emptyList();
}
return new LogMinerDmlEntryImpl(RowMapper.DELETE, Collections.emptyList(), oldValues);
}
catch (Exception e) {
@ -286,10 +247,10 @@ else if ((c == ' ' || c == '(') && !inQuote) {
*
* @param sql the sql statement
* @param start the index into the sql statement to begin parsing
* @param columnNames the list that will be populated with the column names
* @param columns the list that will be populated with the column names
* @return the index into the sql string where the column-list clause ended
*/
private int parseColumnListClause(String sql, int start, List<String> columnNames) {
private int parseColumnListClause(String sql, int start, List<LogMinerColumnValue> columns) {
int index = start;
boolean inQuote = false;
for (; index < sql.length(); ++index) {
@ -304,7 +265,7 @@ else if (c == ')' && !inQuote) {
else if (c == '"') {
if (inQuote) {
inQuote = false;
columnNames.add(sql.substring(start + 1, index));
columns.add(new LogMinerColumnValueImpl(sql.substring(start + 1, index)));
start = index + 2;
continue;
}
@ -319,21 +280,22 @@ else if (c == '"') {
*
* @param sql the sql statement
* @param start the index into the sql statement to begin parsing
* @param columnValues the list of that will populated with the column values
* @param columns the list of that will populated with the column values
* @return the index into the sql string where the column-values clause ended
*/
private int parseColumnValuesClause(String sql, int start, List<String> columnValues) {
private int parseColumnValuesClause(String sql, int start, List<LogMinerColumnValue> columns) {
int index = start;
int nested = 0;
boolean inQuote = false;
boolean inValues = false;
// verify entering values-clause
if (!sql.substring(index, index + 8).equals(VALUES)) {
if (sql.indexOf(VALUES, index) != index) {
throw new DebeziumException("Failed to parse DML: " + sql);
}
index += VALUES_LENGTH;
int columnIndex = 0;
for (; index < sql.length(); ++index) {
char c = sql.charAt(index);
if (c == '(' && !inQuote && !inValues) {
@ -358,12 +320,20 @@ else if (!inQuote && (c == ',' || c == ')')) {
if (c == ',' && nested != 0) {
continue;
}
String s = sql.substring(start, index);
if (s.startsWith("'") && s.endsWith("'")) {
// if the value is single-quoted at the start/end, clear the quotes.
s = s.substring(1, s.length() - 1);
if (sql.charAt(start) == '\'' && sql.charAt(index - 1) == '\'') {
// value is single-quoted at the start/end, substring without the quotes.
columns.get(columnIndex).setColumnData(sql.substring(start + 1, index - 1));
}
columnValues.add(s.equals(UNSUPPORTED_TYPE) ? null : s);
else {
// use value as-is
String s = sql.substring(start, index);
if (!s.equals(UNSUPPORTED_TYPE) && !s.equals(NULL)) {
columns.get(columnIndex).setColumnData(s);
}
}
columnIndex++;
start = index + 1;
}
}
@ -376,11 +346,10 @@ else if (!inQuote && (c == ',' || c == ')')) {
*
* @param sql the sql statement
* @param start the index into the sql statement to begin parsing
* @param columnNames the list of the changed column names that will be populated
* @param columnValues the list of the changed column values that will be populated
* @param columns the list of the changed columns that will be populated
* @return the index into the sql string where the set-clause ended
*/
private int parseSetClause(String sql, int start, List<String> columnNames, List<String> columnValues) {
private int parseSetClause(String sql, int start, List<LogMinerColumnValue> columns) {
boolean inDoubleQuote = false;
boolean inSingleQuote = false;
boolean inColumnName = true;
@ -389,12 +358,13 @@ private int parseSetClause(String sql, int start, List<String> columnNames, List
int nested = 0;
// verify entering set-clause
if (!sql.substring(start, start + SET_LENGTH).equals(SET)) {
if (sql.indexOf(SET, start) != start) {
throw new DebeziumException("Failed to parse DML: " + sql);
}
start += SET_LENGTH;
int index = start;
int columnIndex = 0;
for (; index < sql.length(); ++index) {
char c = sql.charAt(index);
char lookAhead = (index + 1 < sql.length()) ? sql.charAt(index + 1) : 0;
@ -402,7 +372,7 @@ private int parseSetClause(String sql, int start, List<String> columnNames, List
// Set clause column names are double-quoted
if (inDoubleQuote) {
inDoubleQuote = false;
columnNames.add(sql.substring(start + 1, index));
columns.add(new LogMinerColumnValueImpl(sql.substring(start + 1, index)));
start = index + 1;
inColumnName = false;
continue;
@ -426,7 +396,7 @@ else if (c == '\'' && inColumnValue) {
if (inSingleQuote) {
inSingleQuote = false;
if (nested == 0) {
columnValues.add(sql.substring(start + 1, index));
columns.get(columnIndex++).setColumnData(sql.substring(start + 1, index));
start = index + 1;
inColumnValue = false;
inColumnName = false;
@ -459,7 +429,7 @@ else if (c == ')' && nested > 0) {
else if ((c == ',' || c == ' ' || c == ';') && nested == 0) {
String value = sql.substring(start, index);
if (value.equals(NULL) || value.equals(UNSUPPORTED_TYPE)) {
columnValues.add(null);
columnIndex++;
start = index + 1;
inColumnValue = false;
inSpecial = false;
@ -469,7 +439,7 @@ else if (c == ')' && nested > 0) {
else if (value.equals(UNSUPPORTED)) {
continue;
}
columnValues.add(sql.substring(start, index));
columns.get(columnIndex++).setColumnData(value);
start = index + 1;
inColumnValue = false;
inSpecial = false;
@ -477,8 +447,7 @@ else if (value.equals(UNSUPPORTED)) {
}
}
else if (!inDoubleQuote && !inSingleQuote) {
// else if (!inDoubleQuote && !inSingleQuote && sql.substring(index - 1, index + WHERE_LENGTH - 1).equals(WHERE)) {
if (c == 'w' && lookAhead == 'h' && sql.substring(index - 1, index + WHERE_LENGTH - 1).equals(WHERE)) {
if (c == 'w' && lookAhead == 'h' && sql.indexOf(WHERE, index - 1) == index - 1) {
index -= 1;
break;
}
@ -493,11 +462,10 @@ else if (!inDoubleQuote && !inSingleQuote) {
*
* @param sql the sql statement
* @param start the index into the sql statement to begin parsing
* @param columnNames the column names parsed from the clause
* @param columnValues the column values parsed from the clause
* @param columns the columns parsed from the clause
* @return the index into the sql string to continue parsing
*/
private int parseWhereClause(String sql, int start, List<String> columnNames, List<String> columnValues) {
private int parseWhereClause(String sql, int start, List<LogMinerColumnValue> columns) {
int nested = 0;
boolean inColumnName = true;
boolean inColumnValue = false;
@ -513,12 +481,13 @@ private int parseWhereClause(String sql, int start, List<String> columnNames, Li
}
// verify entering where-clause
if (!sql.substring(start, start + WHERE_LENGTH).equals(WHERE)) {
if (sql.indexOf(WHERE, start) != start) {
throw new DebeziumException("Failed to parse DML: " + sql);
}
start += WHERE_LENGTH;
int index = start;
int columnIndex = 0;
for (; index < sql.length(); ++index) {
char c = sql.charAt(index);
char lookAhead = (index + 1 < sql.length()) ? sql.charAt(index + 1) : 0;
@ -526,7 +495,7 @@ private int parseWhereClause(String sql, int start, List<String> columnNames, Li
// Where clause column names are double-quoted
if (inDoubleQuote) {
inDoubleQuote = false;
columnNames.add(sql.substring(start + 1, index));
columns.add(new LogMinerColumnValueImpl(sql.substring(start + 1, index)));
start = index + 1;
inColumnName = false;
continue;
@ -541,8 +510,8 @@ else if (c == '=' && !inColumnName && !inColumnValue) {
start = index + 1;
}
else if (c == 'I' && !inColumnName && !inColumnValue) {
if (sql.substring(index).startsWith(IS_NULL)) {
columnValues.add(null);
if (sql.indexOf(IS_NULL, index) == index) {
columnIndex++;
index += 6;
start = index;
continue;
@ -558,7 +527,7 @@ else if (c == '\'' && inColumnValue) {
if (inSingleQuote) {
inSingleQuote = false;
if (nested == 0) {
columnValues.add(sql.substring(start + 1, index));
columns.get(columnIndex++).setColumnData(sql.substring(start + 1, index));
start = index + 1;
inColumnValue = false;
inColumnName = false;
@ -584,7 +553,7 @@ else if (c == ')' && nested > 0) {
else if ((c == ';' || c == ' ') && nested == 0) {
String value = sql.substring(start, index);
if (value.equals(NULL) || value.equals(UNSUPPORTED_TYPE)) {
columnValues.add(null);
columnIndex++;
start = index + 1;
inColumnValue = false;
inSpecial = false;
@ -594,7 +563,7 @@ else if (c == ')' && nested > 0) {
else if (value.equals(UNSUPPORTED)) {
continue;
}
columnValues.add(sql.substring(start, index));
columns.get(columnIndex++).setColumnData(value);
start = index + 1;
inColumnValue = false;
inSpecial = false;
@ -602,12 +571,12 @@ else if (value.equals(UNSUPPORTED)) {
}
}
else if (!inColumnValue && !inColumnName) {
if (c == 'a' && lookAhead == 'n' && sql.substring(index).startsWith(AND)) {
if (c == 'a' && lookAhead == 'n' && sql.indexOf(AND, index) == index) {
index += 3;
start = index;
inColumnName = true;
}
else if (c == 'o' && lookAhead == 'r' && sql.substring(index).startsWith(OR)) {
else if (c == 'o' && lookAhead == 'r' && sql.indexOf(OR, index) == index) {
index += 2;
start = index;
inColumnName = true;
@ -617,32 +586,4 @@ else if (c == 'o' && lookAhead == 'r' && sql.substring(index).startsWith(OR)) {
return index;
}
/**
* Remove {@code '} quotes from around the provided text if they exist; otherwise the value is returned as-is.
*
* @param text the text to remove single quotes
* @return the text with single quotes removed
*/
private static String removeSingleQuotes(String text) {
if (text.startsWith(SINGLE_QUOTE) && text.endsWith(SINGLE_QUOTE)) {
return text.substring(1, text.length() - 1);
}
return text;
}
/**
* Helper method to create a {@link LogMinerColumnValue} from a column name/value pair.
*
* @param columnName the column name
* @param columnValue the column value
* @return the LogMiner column value object
*/
private static LogMinerColumnValue createColumnValue(String columnName, String columnValue) {
LogMinerColumnValue value = new LogMinerColumnValueImpl(columnName);
if (columnValue != null && !columnValue.equals(NULL)) {
value.setColumnData(columnValue);
}
return value;
}
}

View File

@ -126,23 +126,23 @@ public void testParsingUpdate() throws Exception {
assertThat(entry.getOldValues().get(8).getColumnData()).isNull();
assertThat(entry.getOldValues().get(9).getColumnData()).isNull();
assertThat(entry.getNewValues()).hasSize(10);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME");
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("TS");
assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("UT");
assertThat(entry.getNewValues().get(4).getColumnName()).isEqualTo("DATE");
assertThat(entry.getNewValues().get(5).getColumnName()).isEqualTo("UT2");
assertThat(entry.getNewValues().get(6).getColumnName()).isEqualTo("C1");
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("NAME");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("TS");
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("UT");
assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("DATE");
assertThat(entry.getNewValues().get(4).getColumnName()).isEqualTo("UT2");
assertThat(entry.getNewValues().get(5).getColumnName()).isEqualTo("C1");
assertThat(entry.getNewValues().get(6).getColumnName()).isEqualTo("ID");
assertThat(entry.getNewValues().get(7).getColumnName()).isEqualTo("IS");
assertThat(entry.getNewValues().get(8).getColumnName()).isEqualTo("IS2");
assertThat(entry.getNewValues().get(9).getColumnName()).isEqualTo("UNUSED");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("1");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Bob");
assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-02 00:00:00.')");
assertThat(entry.getNewValues().get(3).getColumnData()).isNull();
assertThat(entry.getNewValues().get(4).getColumnData()).isEqualTo("TO_DATE('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS')");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("Bob");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-02 00:00:00.')");
assertThat(entry.getNewValues().get(2).getColumnData()).isNull();
assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("TO_DATE('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS')");
assertThat(entry.getNewValues().get(4).getColumnData()).isNull();
assertThat(entry.getNewValues().get(5).getColumnData()).isNull();
assertThat(entry.getNewValues().get(6).getColumnData()).isNull();
assertThat(entry.getNewValues().get(6).getColumnData()).isEqualTo("1");
assertThat(entry.getNewValues().get(7).getColumnData()).isNull();
assertThat(entry.getNewValues().get(8).getColumnData()).isNull();
assertThat(entry.getNewValues().get(9).getColumnData()).isNull();
@ -317,10 +317,10 @@ public void shouldParsingRedoSqlWithParenthesisInFunctionArgumentStrings() throw
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("C2");
assertThat(entry.getOldValues().get(1).getColumnData()).isNull();
assertThat(entry.getNewValues()).hasSize(2);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("C1");
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("C2");
assertThat(entry.getNewValues().get(0).getColumnData())
.isEqualTo("UNISTR('\\963F\\72F8\\5C0F\\706B\\8F66\\5BB6\\5EAD\\7968(\\60CA\\559C\\FF09\\FF082161\\FF09')");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("C2");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("C1");
assertThat(entry.getNewValues().get(1).getColumnData())
.isEqualTo("UNISTR('\\963F\\72F8\\5C0F\\706B\\8F66\\5BB6\\5EAD\\7968(\\60CA\\559C\\FF09\\FF082161\\FF09')");
@ -364,10 +364,10 @@ public void testParsingDoubleSingleQuoteInWhereClause() throws Exception {
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("COL2");
assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("0");
assertThat(entry.getNewValues()).hasSize(2);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("COL1");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("Bob''s dog");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("COL2");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("1");
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("COL2");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("1");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("COL1");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Bob''s dog");
sql = "delete from \"DEBEZIUM\".\"TEST\" where \"COL1\" = 'Bob''s dog' and \"COL2\" = '1';";
entry = fastDmlParser.parse(sql, table, null);