DBZ-3193 Fix IndexOutOfBoundsException with "IS NULL" predicate
This commit is contained in:
parent
2d919288f8
commit
07322485b6
@ -21,10 +21,10 @@
|
||||
import io.debezium.connector.oracle.OracleOffsetContext;
|
||||
import io.debezium.connector.oracle.OracleValueConverters;
|
||||
import io.debezium.connector.oracle.logminer.parser.DmlParser;
|
||||
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
|
||||
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
|
||||
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
|
||||
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
|
||||
import io.debezium.relational.Table;
|
||||
@ -178,6 +178,11 @@ int processResult(ResultSet resultSet) {
|
||||
final TableId tableId = RowMapper.getTableId(connectorConfig.getCatalogName(), resultSet);
|
||||
|
||||
LOGGER.trace("DML, {}, sql {}", logMessage, redoSql);
|
||||
if (redoSql == null) {
|
||||
LOGGER.trace("Redo SQL was empty, DML operation skipped.");
|
||||
continue;
|
||||
}
|
||||
|
||||
dmlCounter++;
|
||||
switch (operationCode) {
|
||||
case RowMapper.INSERT:
|
||||
@ -191,24 +196,7 @@ int processResult(ResultSet resultSet) {
|
||||
break;
|
||||
}
|
||||
|
||||
Instant parseStart = Instant.now();
|
||||
LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), tableId, txId);
|
||||
metrics.addCurrentParseTime(Duration.between(parseStart, Instant.now()));
|
||||
|
||||
if (dmlEntry == null || redoSql == null) {
|
||||
LOGGER.trace("Following statement was not parsed: {}, details: {}", redoSql, logMessage);
|
||||
continue;
|
||||
}
|
||||
|
||||
// this will happen for instance on a excluded column change, we will omit this update
|
||||
if (dmlEntry.getCommandType().equals(Envelope.Operation.UPDATE)
|
||||
&& dmlEntry.getOldValues().size() == dmlEntry.getNewValues().size()
|
||||
&& dmlEntry.getNewValues().containsAll(dmlEntry.getOldValues())) {
|
||||
LOGGER.trace("Following DML was skipped, " +
|
||||
"most likely because of ignored excluded column change: {}, details: {}", redoSql, logMessage);
|
||||
continue;
|
||||
}
|
||||
|
||||
final LogMinerDmlEntry dmlEntry = parse(redoSql, schema, tableId, txId);
|
||||
dmlEntry.setObjectOwner(segOwner);
|
||||
dmlEntry.setSourceTime(changeTime);
|
||||
dmlEntry.setTransactionId(txId);
|
||||
@ -287,4 +275,23 @@ private void warnStuckScn() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private LogMinerDmlEntry parse(String redoSql, OracleDatabaseSchema schema, TableId tableId, String txId) {
|
||||
LogMinerDmlEntry dmlEntry;
|
||||
try {
|
||||
Instant parseStart = Instant.now();
|
||||
dmlEntry = dmlParser.parse(redoSql, schema.getTables(), tableId, txId);
|
||||
metrics.addCurrentParseTime(Duration.between(parseStart, Instant.now()));
|
||||
}
|
||||
catch (DmlParserException e) {
|
||||
StringBuilder message = new StringBuilder();
|
||||
message.append("DML statement couldn't be parsed.");
|
||||
message.append(" Please open a Jira issue with the statement '").append(redoSql).append("'.");
|
||||
if (LogMiningDmlParser.FAST.equals(connectorConfig.getLogMiningDmlParser())) {
|
||||
message.append(" You can set internal.log.mining.dml.parser='legacy' as a workaround until the parse error is fixed.");
|
||||
}
|
||||
throw new DmlParserException(message.toString(), e);
|
||||
}
|
||||
return dmlEntry;
|
||||
}
|
||||
}
|
||||
|
@ -59,6 +59,7 @@ public class LogMinerDmlParser implements DmlParser {
|
||||
private static final String SET = " set ";
|
||||
private static final String WHERE = " where ";
|
||||
private static final String VALUES = " values ";
|
||||
private static final String IS_NULL = "IS NULL";
|
||||
// Use by Oracle for specific data types that cannot be represented in SQL
|
||||
private static final String UNSUPPORTED = "Unsupported";
|
||||
private static final String UNSUPPORTED_TYPE = "Unsupported Type";
|
||||
@ -477,6 +478,14 @@ else if (c == '=' && !inColumnName && !inColumnValue) {
|
||||
index += 1;
|
||||
start = index + 1;
|
||||
}
|
||||
else if (c == 'I' && !inColumnName && !inColumnValue) {
|
||||
if (sql.substring(index).startsWith(IS_NULL)) {
|
||||
columnValues.add(null);
|
||||
index += 6;
|
||||
start = index;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else if (c == '\'' && inColumnValue && nested == 0) {
|
||||
// Where clause single-quoted column value
|
||||
if (inSingleQuote) {
|
||||
|
@ -38,14 +38,14 @@ public void beforeEach() throws Exception {
|
||||
@Test
|
||||
@FixFor("DBZ-3078")
|
||||
public void testParsingInsert() throws Exception {
|
||||
String sql = "insert into \"DEBEZIUM\".\"TEST\"(\"ID\",\"NAME\",\"TS\",\"UT\",\"DATE\",\"UT2\",\"C1\") values " +
|
||||
String sql = "insert into \"DEBEZIUM\".\"TEST\"(\"ID\",\"NAME\",\"TS\",\"UT\",\"DATE\",\"UT2\",\"C1\",\"C2\") values " +
|
||||
"('1','Acme',TO_TIMESTAMP('2020-02-01 00:00:00.'),Unsupported Type," +
|
||||
"TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'),Unsupported Type,NULL);";
|
||||
"TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'),Unsupported Type,NULL,NULL);";
|
||||
|
||||
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
|
||||
assertThat(entry.getCommandType()).isEqualTo(Operation.CREATE);
|
||||
assertThat(entry.getOldValues()).isEmpty();
|
||||
assertThat(entry.getNewValues()).hasSize(7);
|
||||
assertThat(entry.getNewValues()).hasSize(8);
|
||||
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
|
||||
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME");
|
||||
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("TS");
|
||||
@ -60,6 +60,7 @@ public void testParsingInsert() throws Exception {
|
||||
assertThat(entry.getNewValues().get(4).getColumnData()).isEqualTo("TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')");
|
||||
assertThat(entry.getNewValues().get(5).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues().get(6).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues().get(7).getColumnData()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -70,11 +71,11 @@ public void testParsingUpdate() throws Exception {
|
||||
"\"DATE\" = TO_DATE('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS'), \"UT2\" = Unsupported Type, " +
|
||||
"\"C1\" = NULL where \"ID\" = '1' and \"NAME\" = 'Acme' and \"TS\" = TO_TIMESTAMP('2020-02-01 00:00:00.') and " +
|
||||
"\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') and " +
|
||||
"\"UT2\" = Unsupported Type and \"C1\" = NULL;";
|
||||
"\"UT2\" = Unsupported Type and \"C1\" = NULL and \"IS\" IS NULL and \"IS2\" IS NULL;";
|
||||
|
||||
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
|
||||
assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE);
|
||||
assertThat(entry.getOldValues()).hasSize(7);
|
||||
assertThat(entry.getOldValues()).hasSize(9);
|
||||
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
|
||||
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME");
|
||||
assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("TS");
|
||||
@ -88,7 +89,8 @@ public void testParsingUpdate() throws Exception {
|
||||
assertThat(entry.getOldValues().get(4).getColumnData()).isEqualTo("TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')");
|
||||
assertThat(entry.getOldValues().get(5).getColumnData()).isNull();
|
||||
assertThat(entry.getOldValues().get(6).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues()).hasSize(7);
|
||||
assertThat(entry.getOldValues().get(7).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues()).hasSize(9);
|
||||
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("ID");
|
||||
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("NAME");
|
||||
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("TS");
|
||||
@ -102,7 +104,8 @@ public void testParsingUpdate() throws Exception {
|
||||
assertThat(entry.getNewValues().get(4).getColumnData()).isEqualTo("TO_DATE('2020-02-02 00:00:00', 'YYYY-MM-DD HH24:MI:SS')");
|
||||
assertThat(entry.getNewValues().get(5).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues().get(6).getColumnData()).isNull();
|
||||
|
||||
assertThat(entry.getNewValues().get(7).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues().get(8).getColumnData()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -110,11 +113,12 @@ public void testParsingUpdate() throws Exception {
|
||||
public void testParsingDelete() throws Exception {
|
||||
String sql = "delete from \"DEBEZIUM\".\"TEST\" " +
|
||||
"where \"ID\" = '1' and \"NAME\" = 'Acme' and \"TS\" = TO_TIMESTAMP('2020-02-01 00:00:00.') and " +
|
||||
"\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS');";
|
||||
"\"UT\" = Unsupported Type and \"DATE\" = TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') and " +
|
||||
"\"IS\" IS NULL and \"IS2\" IS NULL;";
|
||||
|
||||
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
|
||||
assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE);
|
||||
assertThat(entry.getOldValues()).hasSize(5);
|
||||
assertThat(entry.getOldValues()).hasSize(7);
|
||||
assertThat(entry.getOldValues().get(0).getColumnName()).isEqualTo("ID");
|
||||
assertThat(entry.getOldValues().get(1).getColumnName()).isEqualTo("NAME");
|
||||
assertThat(entry.getOldValues().get(2).getColumnName()).isEqualTo("TS");
|
||||
@ -125,6 +129,8 @@ public void testParsingDelete() throws Exception {
|
||||
assertThat(entry.getOldValues().get(2).getColumnData()).isEqualTo("TO_TIMESTAMP('2020-02-01 00:00:00.')");
|
||||
assertThat(entry.getOldValues().get(3).getColumnData()).isNull();
|
||||
assertThat(entry.getOldValues().get(4).getColumnData()).isEqualTo("TO_DATE('2020-02-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS')");
|
||||
assertThat(entry.getOldValues().get(5).getColumnData()).isNull();
|
||||
assertThat(entry.getOldValues().get(6).getColumnData()).isNull();
|
||||
assertThat(entry.getNewValues()).isEmpty();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user