DBZ-4366 Re-organize code, re-use static values
This commit is contained in:
parent
eb7c5b02c6
commit
8f53b7abea
@ -62,6 +62,10 @@ public class OracleValueConverters extends JdbcValueConverters {
|
|||||||
* Marker value indicating an unavilable column value.
|
* Marker value indicating an unavilable column value.
|
||||||
*/
|
*/
|
||||||
public static final Object UNAVAILABLE_VALUE = new Object();
|
public static final Object UNAVAILABLE_VALUE = new Object();
|
||||||
|
public static final String EMPTY_BLOB_FUNCTION = "EMPTY_BLOB()";
|
||||||
|
public static final String EMPTY_CLOB_FUNCTION = "EMPTY_CLOB()";
|
||||||
|
public static final String HEXTORAW_FUNCTION_START = "HEXTORAW('";
|
||||||
|
public static final String HEXTORAW_FUNCTION_END = "')";
|
||||||
|
|
||||||
private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)");
|
private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)");
|
||||||
|
|
||||||
@ -99,11 +103,6 @@ public class OracleValueConverters extends JdbcValueConverters {
|
|||||||
.appendOffset("+HH:MM", "")
|
.appendOffset("+HH:MM", "")
|
||||||
.toFormatter();
|
.toFormatter();
|
||||||
|
|
||||||
private static final String EMPTY_BLOB_FUNCTION = "EMPTY_BLOB()";
|
|
||||||
private static final String EMPTY_CLOB_FUNCTION = "EMPTY_CLOB()";
|
|
||||||
private static final String HEXTORAW_FUNCTION_START = "HEXTORAW('";
|
|
||||||
private static final String HEXTORAW_FUNCTION_END = "')";
|
|
||||||
|
|
||||||
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
|
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
|
||||||
private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
|
private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
|
||||||
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
|
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
|
||||||
|
@ -74,22 +74,12 @@ public class TransactionCommitConsumer implements AutoCloseable, BlockingConsume
|
|||||||
private final BlockingConsumer<LogMinerEvent> delegate;
|
private final BlockingConsumer<LogMinerEvent> delegate;
|
||||||
private final OracleConnectorConfig connectorConfig;
|
private final OracleConnectorConfig connectorConfig;
|
||||||
private final OracleDatabaseSchema schema;
|
private final OracleDatabaseSchema schema;
|
||||||
|
|
||||||
private class RowState {
|
|
||||||
final DmlEvent event;
|
|
||||||
final int transactionIndex;
|
|
||||||
|
|
||||||
RowState(final DmlEvent event, final int transactionIndex) {
|
|
||||||
this.event = event;
|
|
||||||
this.transactionIndex = transactionIndex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final Map<String, RowState> rows = new HashMap<>();
|
private final Map<String, RowState> rows = new HashMap<>();
|
||||||
|
|
||||||
private String currentLobRowId;
|
private String currentLobRowId;
|
||||||
private String currentLobColumnName;
|
private String currentLobColumnName;
|
||||||
private int currentLobColumnPosition = -1;
|
private int currentLobColumnPosition = -1;
|
||||||
|
private int transactionIndex = 0;
|
||||||
|
|
||||||
public TransactionCommitConsumer(BlockingConsumer<LogMinerEvent> delegate, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
|
public TransactionCommitConsumer(BlockingConsumer<LogMinerEvent> delegate, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
@ -100,7 +90,7 @@ public TransactionCommitConsumer(BlockingConsumer<LogMinerEvent> delegate, Oracl
|
|||||||
@Override
|
@Override
|
||||||
public void close() throws InterruptedException {
|
public void close() throws InterruptedException {
|
||||||
// dispatch the remaining events in the order we received them from LogMiner
|
// dispatch the remaining events in the order we received them from LogMiner
|
||||||
List<RowState> pending = new ArrayList<RowState>(rows.values());
|
List<RowState> pending = new ArrayList<>(rows.values());
|
||||||
Collections.sort(pending, (a, b) -> a.transactionIndex - b.transactionIndex);
|
Collections.sort(pending, (a, b) -> a.transactionIndex - b.transactionIndex);
|
||||||
for (final RowState rowState : pending) {
|
for (final RowState rowState : pending) {
|
||||||
prepareAndDispatch(rowState.event);
|
prepareAndDispatch(rowState.event);
|
||||||
@ -123,8 +113,6 @@ public void accept(LogMinerEvent event) throws InterruptedException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int transactionIndex = 0;
|
|
||||||
|
|
||||||
private void acceptDmlEvent(DmlEvent event) throws InterruptedException {
|
private void acceptDmlEvent(DmlEvent event) throws InterruptedException {
|
||||||
transactionIndex++;
|
transactionIndex++;
|
||||||
|
|
||||||
@ -316,7 +304,8 @@ static class LobFragment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void initializeFromData(String data) {
|
private void initializeFromData(String data) {
|
||||||
this.binary = data.startsWith("HEXTORAW('") && data.endsWith("')");
|
this.binary = data.startsWith(OracleValueConverters.HEXTORAW_FUNCTION_START)
|
||||||
|
&& data.endsWith(OracleValueConverters.HEXTORAW_FUNCTION_END);
|
||||||
if (this.binary) {
|
if (this.binary) {
|
||||||
try {
|
try {
|
||||||
this.bytes = RAW.hexString2Bytes(data.substring(10, data.length() - 2));
|
this.bytes = RAW.hexString2Bytes(data.substring(10, data.length() - 2));
|
||||||
@ -406,9 +395,7 @@ void append(LobFragment other) {
|
|||||||
|
|
||||||
static String spaces(int length) {
|
static String spaces(int length) {
|
||||||
char[] backing = new char[length];
|
char[] backing = new char[length];
|
||||||
for (int i = 0; i < backing.length; i++) {
|
Arrays.fill(backing, ' ');
|
||||||
backing[i] = ' ';
|
|
||||||
}
|
|
||||||
return new String(backing);
|
return new String(backing);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -527,11 +514,11 @@ Object merge() {
|
|||||||
if (isNull) {
|
if (isNull) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (end == 0 && binary) {
|
if (end == 0) {
|
||||||
return "EMPTY_BLOB()";
|
if (binary) {
|
||||||
|
return OracleValueConverters.EMPTY_BLOB_FUNCTION;
|
||||||
}
|
}
|
||||||
if (end == 0 && !binary) {
|
return OracleValueConverters.EMPTY_CLOB_FUNCTION;
|
||||||
return "EMPTY_CLOB()";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (binary) {
|
if (binary) {
|
||||||
@ -583,11 +570,11 @@ static LobUnderConstruction fromInitialValue(Object value) {
|
|||||||
if (value instanceof String) {
|
if (value instanceof String) {
|
||||||
String strval = (String) value;
|
String strval = (String) value;
|
||||||
LobUnderConstruction lob = new LobUnderConstruction();
|
LobUnderConstruction lob = new LobUnderConstruction();
|
||||||
if (strval.equals("EMPTY_BLOB()")) {
|
if (OracleValueConverters.EMPTY_BLOB_FUNCTION.equals(strval)) {
|
||||||
lob.binary = true;
|
lob.binary = true;
|
||||||
lob.isNull = false; // lob must be emitted
|
lob.isNull = false; // lob must be emitted
|
||||||
}
|
}
|
||||||
else if (strval.equals("EMPTY_CLOB()")) {
|
else if (OracleValueConverters.EMPTY_CLOB_FUNCTION.equals(strval)) {
|
||||||
lob.binary = false;
|
lob.binary = false;
|
||||||
lob.isNull = false; // lob must be emitted
|
lob.isNull = false; // lob must be emitted
|
||||||
}
|
}
|
||||||
@ -601,4 +588,14 @@ else if (strval.equals("EMPTY_CLOB()")) {
|
|||||||
return new LobUnderConstruction();
|
return new LobUnderConstruction();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class RowState {
|
||||||
|
final DmlEvent event;
|
||||||
|
final int transactionIndex;
|
||||||
|
|
||||||
|
RowState(final DmlEvent event, final int transactionIndex) {
|
||||||
|
this.event = event;
|
||||||
|
this.transactionIndex = transactionIndex;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user