DBZ-4987 Propagate up all exceptions from LCR handler

This commit is contained in:
Chris Cranford 2022-04-14 09:49:24 -04:00 committed by Jiri Pechanec
parent d734f9740e
commit f667402fe0

View File

@ -84,33 +84,33 @@ public LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler error
@Override @Override
public void processLCR(LCR lcr) throws StreamsException { public void processLCR(LCR lcr) throws StreamsException {
LOGGER.trace("Received LCR {}", lcr); LOGGER.trace("Received LCR {}", lcr);
// First set watermark to flush messages seen
setWatermark();
columnChunks.clear();
final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition());
// After a restart it may happen we get the event with the last processed LCR again
LcrPosition offsetLcrPosition = LcrPosition.valueOf(offsetContext.getLcrPosition());
if (lcrPosition.compareTo(offsetLcrPosition) <= 0) {
if (LOGGER.isDebugEnabled()) {
final LcrPosition recPosition = offsetLcrPosition;
LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}",
lcrPosition,
lcrPosition.getScn(),
recPosition != null ? recPosition : "none",
recPosition != null ? recPosition.getScn() : "none");
}
return;
}
offsetContext.setScn(lcrPosition.getScn());
offsetContext.setLcrPosition(lcrPosition.toString());
offsetContext.setTransactionId(lcr.getTransactionId());
offsetContext.tableEvent(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()), lcr.getSourceTime().timestampValue().toInstant());
try { try {
// First set watermark to flush messages seen
setWatermark();
columnChunks.clear();
final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition());
// After a restart it may happen we get the event with the last processed LCR again
LcrPosition offsetLcrPosition = LcrPosition.valueOf(offsetContext.getLcrPosition());
if (lcrPosition.compareTo(offsetLcrPosition) <= 0) {
if (LOGGER.isDebugEnabled()) {
final LcrPosition recPosition = offsetLcrPosition;
LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}",
lcrPosition,
lcrPosition.getScn(),
recPosition != null ? recPosition : "none",
recPosition != null ? recPosition.getScn() : "none");
}
return;
}
offsetContext.setScn(lcrPosition.getScn());
offsetContext.setLcrPosition(lcrPosition.toString());
offsetContext.setTransactionId(lcr.getTransactionId());
offsetContext.tableEvent(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()),
lcr.getSourceTime().timestampValue().toInstant());
if (lcr instanceof RowLCR) { if (lcr instanceof RowLCR) {
processRowLCR((RowLCR) lcr); processRowLCR((RowLCR) lcr);
} }