From ffa58f3a94113f6b7563c183ffc70ddd74f96519 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 5 Sep 2024 06:22:47 -0400 Subject: [PATCH] DBZ-8208 Extend `error.processing.failure.handling.mode` to DML parsing --- .../AbstractLogMinerEventProcessor.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 34fc8a735..0be314f36 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -1517,11 +1517,30 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp return; } + final LogMinerEvent event; + try { + event = eventSupplier.get(); + } + catch (DmlParserException e) { + switch (connectorConfig.getEventProcessingFailureHandlingMode()) { + case FAIL: + LOGGER.error("Failed to parse SQL for event '{}'", row); + throw e; + case WARN: + LOGGER.warn("Failed to parse SQL '{}'. The event '{}' is being ignored and skipped.", row.getRedoSql(), row); + return; + default: + // In this case, we explicitly log the situation in "debug" only and not as an error/warn. + LOGGER.debug("Failed to parse SQL for event '{}'. This event is being ignored and skipped.", row); + return; + } + } + final String eventKey = transaction.getEventId(transaction.getNextEventId()); if (!getEventCache().containsKey(eventKey)) { // Add new event at eventId offset LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey); - getEventCache().put(eventKey, eventSupplier.get()); + getEventCache().put(eventKey, event); metrics.calculateLagFromSource(row.getChangeTime()); inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId()); }