From f667402fe09f0f98dfe49ecf1f262442af3a1658 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 14 Apr 2022 09:49:24 -0400 Subject: [PATCH] DBZ-4987 Propagate up all exceptions from LCR handler --- .../oracle/xstream/LcrEventHandler.java | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java index 9a12dac1f..2436f5a14 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java @@ -84,33 +84,33 @@ public LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler error @Override public void processLCR(LCR lcr) throws StreamsException { LOGGER.trace("Received LCR {}", lcr); - - // First set watermark to flush messages seen - setWatermark(); - columnChunks.clear(); - - final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition()); - - // After a restart it may happen we get the event with the last processed LCR again - LcrPosition offsetLcrPosition = LcrPosition.valueOf(offsetContext.getLcrPosition()); - if (lcrPosition.compareTo(offsetLcrPosition) <= 0) { - if (LOGGER.isDebugEnabled()) { - final LcrPosition recPosition = offsetLcrPosition; - LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}", - lcrPosition, - lcrPosition.getScn(), - recPosition != null ? recPosition : "none", - recPosition != null ? recPosition.getScn() : "none"); - } - return; - } - - offsetContext.setScn(lcrPosition.getScn()); - offsetContext.setLcrPosition(lcrPosition.toString()); - offsetContext.setTransactionId(lcr.getTransactionId()); - offsetContext.tableEvent(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()), lcr.getSourceTime().timestampValue().toInstant()); - try { + // First set watermark to flush messages seen + setWatermark(); + columnChunks.clear(); + + final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition()); + + // After a restart it may happen we get the event with the last processed LCR again + LcrPosition offsetLcrPosition = LcrPosition.valueOf(offsetContext.getLcrPosition()); + if (lcrPosition.compareTo(offsetLcrPosition) <= 0) { + if (LOGGER.isDebugEnabled()) { + final LcrPosition recPosition = offsetLcrPosition; + LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}", + lcrPosition, + lcrPosition.getScn(), + recPosition != null ? recPosition : "none", + recPosition != null ? recPosition.getScn() : "none"); + } + return; + } + + offsetContext.setScn(lcrPosition.getScn()); + offsetContext.setLcrPosition(lcrPosition.toString()); + offsetContext.setTransactionId(lcr.getTransactionId()); + offsetContext.tableEvent(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()), + lcr.getSourceTime().timestampValue().toInstant()); + if (lcr instanceof RowLCR) { processRowLCR((RowLCR) lcr); }