DBZ-2948 Fix IndexOutOfBoundsException with LOB select parser

This commit is contained in:
Chris Cranford 2021-05-18 17:55:09 -04:00 committed by Gunnar Morling
parent 754462a436
commit 172c17d207
3 changed files with 267 additions and 19 deletions

View File

@ -164,16 +164,16 @@ void processResult(ResultSet resultSet) throws SQLException {
if (tableName != null) {
final TableId tableId = RowMapper.getTableId(connectorConfig.getCatalogName(), resultSet);
dispatcher.dispatchSchemaChangeEvent(tableId,
new OracleSchemaChangeEventEmitter(
connectorConfig,
offsetContext,
tableId,
tableId.catalog(),
tableId.schema(),
redoSql,
schema,
changeTime.toInstant(),
streamingMetrics));
new OracleSchemaChangeEventEmitter(
connectorConfig,
offsetContext,
tableId,
tableId.catalog(),
tableId.schema(),
redoSql,
schema,
changeTime.toInstant(),
streamingMetrics));
}
}
catch (InterruptedException e) {
@ -186,6 +186,7 @@ void processResult(ResultSet resultSet) throws SQLException {
break;
}
case RowMapper.SELECT_LOB_LOCATOR: {
LOGGER.trace("SEL_LOB_LOCATOR: {}, REDO_SQL: {}", logMessage, redoSql);
final TableId tableId = RowMapper.getTableId(connectorConfig.getCatalogName(), resultSet);
final LogMinerDmlEntry entry = selectLobParser.parse(redoSql);
entry.setObjectOwner(segOwner);

View File

@ -15,6 +15,7 @@
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.text.ParsingException;
/**
* Simple text-based parser implementation for Oracle LogMiner SEL_LOB_LOCATOR Redo SQL.
@ -127,11 +128,25 @@ private int parseQuotedValue(String sql, int index, Consumer<String> collector)
private int parseColumnValue(String sql, int index, Consumer<String> collector) {
boolean inSingleQuotes = false;
int start = -1, last = -1;
int start = -1, last = -1, nested = 0;
for (int i = index; i < sql.length(); ++i) {
char c = sql.charAt(i);
char lookAhead = (index + 1 < sql.length()) ? sql.charAt(i + 1) : 0;
if (c == '\'') {
char lookAhead = (i + 1 < sql.length()) ? sql.charAt(i + 1) : 0;
if (i == index && c != '\'') {
start = i;
}
else if (c == '(' && !inSingleQuotes) {
nested++;
}
else if (c == ')' && !inSingleQuotes) {
nested--;
if (nested == 0) {
last = i + 1;
index = i + 1;
break;
}
}
else if (c == '\'') {
// skip over double single quote
if (inSingleQuotes && lookAhead == '\'') {
index += 1;
@ -139,17 +154,28 @@ private int parseColumnValue(String sql, int index, Consumer<String> collector)
}
if (inSingleQuotes) {
inSingleQuotes = false;
last = i;
index = i + 1;
break;
if (nested == 0) {
last = i;
index = i + 1;
break;
}
continue;
}
inSingleQuotes = true;
start = i + 1;
if (nested == 0) {
start = i + 1;
}
}
else if (c == ' ' && !inSingleQuotes && nested == 0) {
last = i;
index = i;
break;
}
}
if (start != -1 && last != -1) {
collector.accept(sql.substring(start, last));
final String value = sql.substring(start, last);
collector.accept(value.equalsIgnoreCase("null") ? null : value);
}
return index;
@ -160,7 +186,7 @@ private int parseWhere(String sql, int index) {
// parse column name
StringBuilder columnName = new StringBuilder();
index = parseQuotedValue(sql, index, columnName::append);
index += 3; // space, equals, space
index = parseOperator(sql, index);
final LogMinerColumnValueImpl column = new LogMinerColumnValueImpl(columnName.toString());
index = parseColumnValue(sql, index, column::setColumnData);
index += 1; // space
@ -183,6 +209,34 @@ else if (sql.indexOf(FOR_UPDATE, index) == index) {
return index;
}
private int parseOperator(String sql, int index) {
boolean initialSpace = false;
for (int i = index; i < sql.length(); ++i) {
char c = sql.charAt(i);
char lookAhead = (i + 1 < sql.length()) ? sql.charAt(i + 1) : 0;
if (!initialSpace && c == ' ') {
initialSpace = true;
}
else if (initialSpace && c == '=' && lookAhead == ' ') {
// equals operator
index += 3;
break;
}
else if (initialSpace && c == 'i' && lookAhead == 's') {
char lookAhead2 = (i + 2 < sql.length()) ? sql.charAt(i + 2) : 0;
if (lookAhead2 == ' ') {
index += 4;
break;
}
throw new ParsingException(null, "Expected 'is' at index " + i + ": " + sql);
}
else {
throw new ParsingException(null, "Failed to parse operator at index " + i + ": " + sql);
}
}
return index;
}
private void reset() {
columnName = null;
schemaName = null;

View File

@ -0,0 +1,193 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import static org.fest.assertions.Assertions.assertThat;
import org.junit.Test;
import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.doc.FixFor;
/**
* Unit tests for the Oracle LogMiner {@code SEL_LOB_LOCATOR} operation parser, {@link SelectLobParser}.
*
* @author Chris Cranford
*/
public class SelectLobParserTest {
private static SelectLobParser parser = new SelectLobParser();
@Test
@FixFor("DBZ-2948")
public void shouldParseSimpleClobBasedLobSelect() throws Exception {
String redoSql = "DECLARE \n" +
" loc_c CLOB; \n" +
" buf_c VARCHAR2(6174); \n" +
" loc_b BLOB; \n" +
" buf_b RAW(6174); \n" +
" loc_nc NCLOB; \n" +
" buf_nc NVARCHAR2(6174); \n" +
"BEGIN\n" +
" select \"VAL_CLOB\" into loc_c from \"DEBEZIUM\".\"CLOB_TEST\" where \"ID\" = '2' and \"VAL_DATA\" = 'Test2' for update;";
LogMinerDmlEntry entry = parser.parse(redoSql);
assertThat(parser.isBinary()).isFalse();
assertThat(parser.getColumnName()).isEqualTo("VAL_CLOB");
assertThat(entry.getObjectOwner()).isEqualTo("DEBEZIUM");
assertThat(entry.getObjectName()).isEqualTo("CLOB_TEST");
assertThat(entry.getOperation()).isEqualTo(RowMapper.SELECT_LOB_LOCATOR);
assertThat(entry.getOldValues()).hasSize(2);
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("2");
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("VAL_DATA");
assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("Test2");
assertThat(entry.getNewValues()).hasSize(2);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("2");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("VAL_DATA");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Test2");
}
@Test
@FixFor("DBZ-2948")
public void shouldParseSimpleBlobBasedLobSelect() throws Exception {
String redoSql = "DECLARE \n" +
" loc_c CLOB; \n" +
" buf_c VARCHAR2(6174); \n" +
" loc_b BLOB; \n" +
" buf_b RAW(6174); \n" +
" loc_nc NCLOB; \n" +
" buf_nc NVARCHAR2(6174); \n" +
"BEGIN\n" +
" select \"VAL_BLOB\" into loc_b from \"DEBEZIUM\".\"BLOB_TEST\" where \"ID\" = '2' and \"VAL_DATA\" = 'Test2' for update;";
LogMinerDmlEntry entry = parser.parse(redoSql);
assertThat(parser.isBinary()).isTrue();
assertThat(parser.getColumnName()).isEqualTo("VAL_BLOB");
assertThat(entry.getObjectOwner()).isEqualTo("DEBEZIUM");
assertThat(entry.getObjectName()).isEqualTo("BLOB_TEST");
assertThat(entry.getOperation()).isEqualTo(RowMapper.SELECT_LOB_LOCATOR);
assertThat(entry.getOldValues()).hasSize(2);
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("2");
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("VAL_DATA");
assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("Test2");
assertThat(entry.getNewValues()).hasSize(2);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("2");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("VAL_DATA");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("Test2");
}
@Test
@FixFor("DBZ-2948")
public void shouldParseComplexClobBasedLobSelect() throws Exception {
String redoSql = "DECLARE \n" +
" loc_c CLOB; \n" +
" buf_c VARCHAR2(6426); \n" +
" loc_b BLOB; \n" +
" buf_b RAW(6426); \n" +
" loc_nc NCLOB; \n" +
" buf_nc NVARCHAR2(6426); \n" +
"BEGIN\n" +
" select \"CLOB_COL\" into loc_c from \"DEBEZIUM\".\"BIG_TABLE\" where \"ID\" = '651900002' and \"NAME\" = " +
"'person number 651900002' and \"AGE\" = '125' and \"ADRESS\" = 'street:651900002 av: 651900002 house: 651900002'" +
" and \"TD\" = TO_DATE('15-MAY-21', 'DD-MON-RR') and \"FLAG\" is null for update;";
LogMinerDmlEntry entry = parser.parse(redoSql);
assertThat(parser.isBinary()).isFalse();
assertThat(parser.getColumnName()).isEqualTo("CLOB_COL");
assertThat(entry.getObjectOwner()).isEqualTo("DEBEZIUM");
assertThat(entry.getObjectName()).isEqualTo("BIG_TABLE");
assertThat(entry.getOperation()).isEqualTo(RowMapper.SELECT_LOB_LOCATOR);
assertThat(entry.getOldValues()).hasSize(6);
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("651900002");
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME");
assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("person number 651900002");
assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("AGE");
assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("125");
assertThat(entry.getOldValues().get(3).getColumnName()).isEqualTo("ADRESS");
assertThat(entry.getOldValues().get(3).getColumnData()).isEqualTo("street:651900002 av: 651900002 house: 651900002");
assertThat(entry.getOldValues().get(4).getColumnName()).isEqualTo("TD");
assertThat(entry.getOldValues().get(4).getColumnData()).isEqualTo("TO_DATE('15-MAY-21', 'DD-MON-RR')");
assertThat(entry.getOldValues().get(5).getColumnName()).isEqualTo("FLAG");
assertThat(entry.getOldValues().get(5).getColumnData()).isNull();
assertThat(entry.getOldValues()).hasSize(6);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("651900002");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("person number 651900002");
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("AGE");
assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("125");
assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("ADRESS");
assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("street:651900002 av: 651900002 house: 651900002");
assertThat(entry.getNewValues().get(4).getColumnName()).isEqualTo("TD");
assertThat(entry.getNewValues().get(4).getColumnData()).isEqualTo("TO_DATE('15-MAY-21', 'DD-MON-RR')");
assertThat(entry.getNewValues().get(5).getColumnName()).isEqualTo("FLAG");
assertThat(entry.getNewValues().get(5).getColumnData()).isNull();
}
@Test
@FixFor("DBZ-2948")
public void shouldParseComplexBlobBasedLobSelect() throws Exception {
String redoSql = "DECLARE \n" +
" loc_c CLOB; \n" +
" buf_c VARCHAR2(6426); \n" +
" loc_b BLOB; \n" +
" buf_b RAW(6426); \n" +
" loc_nc NCLOB; \n" +
" buf_nc NVARCHAR2(6426); \n" +
"BEGIN\n" +
" select \"BLOB_COL\" into loc_b from \"DEBEZIUM\".\"BIG_TABLE\" where \"ID\" = '651900002' and \"NAME\" = " +
"'person number 651900002' and \"AGE\" = '125' and \"ADRESS\" = 'street:651900002 av: 651900002 house: 651900002'" +
" and \"TD\" = TO_DATE('15-MAY-21', 'DD-MON-RR') and \"FLAG\" is null for update;";
LogMinerDmlEntry entry = parser.parse(redoSql);
assertThat(parser.isBinary()).isTrue();
assertThat(parser.getColumnName()).isEqualTo("BLOB_COL");
assertThat(entry.getObjectOwner()).isEqualTo("DEBEZIUM");
assertThat(entry.getObjectName()).isEqualTo("BIG_TABLE");
assertThat(entry.getOperation()).isEqualTo(RowMapper.SELECT_LOB_LOCATOR);
assertThat(entry.getOldValues()).hasSize(6);
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getOldValues().get(0).getColumnData()).isEqualTo("651900002");
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME");
assertThat(entry.getOldValues().get(1).getColumnData()).isEqualTo("person number 651900002");
assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("AGE");
assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("125");
assertThat(entry.getOldValues().get(3).getColumnName()).isEqualTo("ADRESS");
assertThat(entry.getOldValues().get(3).getColumnData()).isEqualTo("street:651900002 av: 651900002 house: 651900002");
assertThat(entry.getOldValues().get(4).getColumnName()).isEqualTo("TD");
assertThat(entry.getOldValues().get(4).getColumnData()).isEqualTo("TO_DATE('15-MAY-21', 'DD-MON-RR')");
assertThat(entry.getOldValues().get(5).getColumnName()).isEqualTo("FLAG");
assertThat(entry.getOldValues().get(5).getColumnData()).isNull();
assertThat(entry.getOldValues()).hasSize(6);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("651900002");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME");
assertThat(entry.getNewValues().get(1).getColumnData()).isEqualTo("person number 651900002");
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("AGE");
assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("125");
assertThat(entry.getNewValues().get(3).getColumnName()).isEqualTo("ADRESS");
assertThat(entry.getNewValues().get(3).getColumnData()).isEqualTo("street:651900002 av: 651900002 house: 651900002");
assertThat(entry.getNewValues().get(4).getColumnName()).isEqualTo("TD");
assertThat(entry.getNewValues().get(4).getColumnData()).isEqualTo("TO_DATE('15-MAY-21', 'DD-MON-RR')");
assertThat(entry.getNewValues().get(5).getColumnName()).isEqualTo("FLAG");
assertThat(entry.getNewValues().get(5).getColumnData()).isNull();
}
}