DBZ-8208 Extend error.processing.failure.handling.mode to DML parsing

This commit is contained in:
Chris Cranford 2024-09-05 06:22:47 -04:00 committed by Jiri Pechanec
parent 16d2d983a0
commit ffa58f3a94

View File

@ -1517,11 +1517,30 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
return; return;
} }
final LogMinerEvent event;
try {
event = eventSupplier.get();
}
catch (DmlParserException e) {
switch (connectorConfig.getEventProcessingFailureHandlingMode()) {
case FAIL:
LOGGER.error("Failed to parse SQL for event '{}'", row);
throw e;
case WARN:
LOGGER.warn("Failed to parse SQL '{}'. The event '{}' is being ignored and skipped.", row.getRedoSql(), row);
return;
default:
// In this case, we explicitly log the situation in "debug" only and not as an error/warn.
LOGGER.debug("Failed to parse SQL for event '{}'. This event is being ignored and skipped.", row);
return;
}
}
final String eventKey = transaction.getEventId(transaction.getNextEventId()); final String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!getEventCache().containsKey(eventKey)) { if (!getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset // Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey); LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getEventCache().put(eventKey, eventSupplier.get()); getEventCache().put(eventKey, event);
metrics.calculateLagFromSource(row.getChangeTime()); metrics.calculateLagFromSource(row.getChangeTime());
inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId()); inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
} }