diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index a568403ce..417c7edd1 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -62,6 +62,10 @@ public class OracleValueConverters extends JdbcValueConverters { * Marker value indicating an unavilable column value. */ public static final Object UNAVAILABLE_VALUE = new Object(); + public static final String EMPTY_BLOB_FUNCTION = "EMPTY_BLOB()"; + public static final String EMPTY_CLOB_FUNCTION = "EMPTY_CLOB()"; + public static final String HEXTORAW_FUNCTION_START = "HEXTORAW('"; + public static final String HEXTORAW_FUNCTION_END = "')"; private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)"); @@ -99,11 +103,6 @@ public class OracleValueConverters extends JdbcValueConverters { .appendOffset("+HH:MM", "") .toFormatter(); - private static final String EMPTY_BLOB_FUNCTION = "EMPTY_BLOB()"; - private static final String EMPTY_CLOB_FUNCTION = "EMPTY_CLOB()"; - private static final String HEXTORAW_FUNCTION_START = "HEXTORAW('"; - private static final String HEXTORAW_FUNCTION_END = "')"; - private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java index 80c3e609e..d4eeac7c8 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java @@ -74,22 +74,12 @@ public class TransactionCommitConsumer implements AutoCloseable, BlockingConsume private final BlockingConsumer delegate; private final OracleConnectorConfig connectorConfig; private final OracleDatabaseSchema schema; - - private class RowState { - final DmlEvent event; - final int transactionIndex; - - RowState(final DmlEvent event, final int transactionIndex) { - this.event = event; - this.transactionIndex = transactionIndex; - } - } - private final Map rows = new HashMap<>(); private String currentLobRowId; private String currentLobColumnName; private int currentLobColumnPosition = -1; + private int transactionIndex = 0; public TransactionCommitConsumer(BlockingConsumer delegate, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) { this.delegate = delegate; @@ -100,7 +90,7 @@ public TransactionCommitConsumer(BlockingConsumer delegate, Oracl @Override public void close() throws InterruptedException { // dispatch the remaining events in the order we received them from LogMiner - List pending = new ArrayList(rows.values()); + List pending = new ArrayList<>(rows.values()); Collections.sort(pending, (a, b) -> a.transactionIndex - b.transactionIndex); for (final RowState rowState : pending) { prepareAndDispatch(rowState.event); @@ -123,8 +113,6 @@ public void accept(LogMinerEvent event) throws InterruptedException { } } - private int transactionIndex = 0; - private void acceptDmlEvent(DmlEvent event) throws InterruptedException { transactionIndex++; @@ -316,7 +304,8 @@ static class LobFragment { } private void initializeFromData(String data) { - this.binary = data.startsWith("HEXTORAW('") && data.endsWith("')"); + this.binary = data.startsWith(OracleValueConverters.HEXTORAW_FUNCTION_START) + && data.endsWith(OracleValueConverters.HEXTORAW_FUNCTION_END); if (this.binary) { try { this.bytes = RAW.hexString2Bytes(data.substring(10, data.length() - 2)); @@ -406,9 +395,7 @@ void append(LobFragment other) { static String spaces(int length) { char[] backing = new char[length]; - for (int i = 0; i < backing.length; i++) { - backing[i] = ' '; - } + Arrays.fill(backing, ' '); return new String(backing); } } @@ -527,11 +514,11 @@ Object merge() { if (isNull) { return null; } - if (end == 0 && binary) { - return "EMPTY_BLOB()"; - } - if (end == 0 && !binary) { - return "EMPTY_CLOB()"; + if (end == 0) { + if (binary) { + return OracleValueConverters.EMPTY_BLOB_FUNCTION; + } + return OracleValueConverters.EMPTY_CLOB_FUNCTION; } if (binary) { @@ -583,11 +570,11 @@ static LobUnderConstruction fromInitialValue(Object value) { if (value instanceof String) { String strval = (String) value; LobUnderConstruction lob = new LobUnderConstruction(); - if (strval.equals("EMPTY_BLOB()")) { + if (OracleValueConverters.EMPTY_BLOB_FUNCTION.equals(strval)) { lob.binary = true; lob.isNull = false; // lob must be emitted } - else if (strval.equals("EMPTY_CLOB()")) { + else if (OracleValueConverters.EMPTY_CLOB_FUNCTION.equals(strval)) { lob.binary = false; lob.isNull = false; // lob must be emitted } @@ -601,4 +588,14 @@ else if (strval.equals("EMPTY_CLOB()")) { return new LobUnderConstruction(); } } + + private static class RowState { + final DmlEvent event; + final int transactionIndex; + + RowState(final DmlEvent event, final int transactionIndex) { + this.event = event; + this.transactionIndex = transactionIndex; + } + } }