DBZ-3235 Do not fail parsing Oracle LogMiner SQL without a WHERE-clause

This commit is contained in:
Chris Cranford 2021-03-05 09:38:50 -05:00 committed by Jiri Pechanec
parent 323b23498e
commit 09fa9d38ab
3 changed files with 62 additions and 9 deletions

View File

@ -25,6 +25,7 @@
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
import io.debezium.connector.oracle.logminer.parser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.Table;
@ -292,6 +293,14 @@ private LogMinerDmlEntry parse(String redoSql, OracleDatabaseSchema schema, Tabl
}
throw new DmlParserException(message.toString(), e);
}
if (dmlEntry.getOldValues().isEmpty()) {
if (Operation.UPDATE.equals(dmlEntry.getCommandType()) || Operation.DELETE.equals(dmlEntry.getCommandType())) {
LOGGER.warn("The DML event '{}' contained no before state.", redoSql);
transactionalBufferMetrics.incrementWarningCounter();
}
}
return dmlEntry;
}
}

View File

@ -156,17 +156,26 @@ private LogMinerDmlEntry parseUpdate(String sql) {
}
List<LogMinerColumnValue> newValues = new ArrayList<>(newColumnNames.size());
for (LogMinerColumnValue oldValue : oldValues) {
boolean found = false;
for (int j = 0; j < newColumnNames.size(); ++j) {
if (newColumnNames.get(j).equals(oldValue.getColumnName())) {
newValues.add(createColumnValue(newColumnNames.get(j), newColumnValues.get(j)));
found = true;
break;
if (!oldValues.isEmpty()) {
for (LogMinerColumnValue oldValue : oldValues) {
boolean found = false;
for (int j = 0; j < newColumnNames.size(); ++j) {
if (newColumnNames.get(j).equals(oldValue.getColumnName())) {
newValues.add(createColumnValue(newColumnNames.get(j), newColumnValues.get(j)));
found = true;
break;
}
}
if (!found) {
newValues.add(oldValue);
}
}
if (!found) {
newValues.add(oldValue);
}
else {
for (int i = 0; i < newColumnNames.size(); ++i) {
LogMinerColumnValue value = new LogMinerColumnValueImpl(newColumnNames.get(i), 0);
value.setColumnData(newColumnValues.get(i));
newValues.add(value);
}
}
@ -450,6 +459,13 @@ private int parseWhereClause(String sql, int start, List<String> columnNames, Li
boolean inSingleQuote = false;
boolean inSpecial = false;
// DBZ-3235
// LogMiner can generate SQL without a WHERE condition under some circumstances and if it does
// we shouldn't immediately fail DML parsing.
if (start >= sql.length()) {
return start;
}
// verify entering where-clause
if (!sql.substring(start, start + WHERE_LENGTH).equals(WHERE)) {
throw new DebeziumException("Failed to parse DML: " + sql);

View File

@ -133,4 +133,32 @@ public void testParsingDelete() throws Exception {
assertThat(entry.getOldValues().get(6).getColumnData()).isNull();
assertThat(entry.getNewValues()).isEmpty();
}
@Test
@FixFor("DBZ-3235")
public void testParsingUpdateWithNoWhereClauseIsAcceptable() throws Exception {
String sql = "update \"DEBEZIUM\".\"TEST\" set \"COL1\" = '1', \"COL2\" = NULL, \"COL3\" = 'Hello';";
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
assertThat(entry.getCommandType()).isEqualTo(Operation.UPDATE);
assertThat(entry.getOldValues()).isEmpty();
assertThat(entry.getNewValues()).hasSize(3);
assertThat(entry.getNewValues().get(0).getColumnName()).isEqualTo("COL1");
assertThat(entry.getNewValues().get(0).getColumnData()).isEqualTo("1");
assertThat(entry.getNewValues().get(1).getColumnName()).isEqualTo("COL2");
assertThat(entry.getNewValues().get(1).getColumnData()).isNull();
assertThat(entry.getNewValues().get(2).getColumnName()).isEqualTo("COL3");
assertThat(entry.getNewValues().get(2).getColumnData()).isEqualTo("Hello");
}
@Test
@FixFor("DBZ-3235")
public void testParsingDeleteWithNoWhereClauseIsAcceptable() throws Exception {
String sql = "delete from \"DEBEZIUM\".\"TEST\";";
LogMinerDmlEntry entry = fastDmlParser.parse(sql, null, null, null);
assertThat(entry.getCommandType()).isEqualTo(Operation.DELETE);
assertThat(entry.getOldValues()).isEmpty();
assertThat(entry.getNewValues()).isEmpty();
}
}