DBZ-4366 basic support for DBMS_LOB package

Make less assumptions about the order of SELECT_LOB_LOCATOR and
LOB_WRITE events when a LOB value is constructed. The current approach
also supports interleaved LOB creation (piecemeal creation of 2 large
LOB values in different rows or columns at the same time).

There is still no support for partial LOB manipulations (trimming a LOB,
appending a couple of bytes, overwriting just a couple of bytes in the
middle, ...). To properly support that, a wholly different approach is
needed.

The internal representation of BLOB values for LogMiner has been changed
from `HEXTORAW('...')` and `BlobChunkList` to a simple byte array for
efficiency.
This commit is contained in:
Dominique Chanet 2021-12-05 13:00:55 +01:00 committed by Chris Cranford
parent a04238ce2b
commit eb7c5b02c6
No known key found for this signature in database
GPG Key ID: 8B8088D41181D04B
9 changed files with 1105 additions and 289 deletions

View File

@ -1,27 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.util.ArrayList;
import java.util.List;
/**
* A "marker" class for passing a collection of Blob data type chunks to {@link OracleValueConverters}
* so that each chunk can be converted, decoded, and combined into a single binary representation
* for event emission.
*
* @author Chris Cranford
*/
public class BlobChunkList extends ArrayList<String> {
/**
* Creates a BLOB chunk list backed by the provided collection.
*
* @param backingList collection of BLOB chunks
*/
public BlobChunkList(List<String> backingList) {
super(backingList);
}
}

View File

@ -314,17 +314,6 @@ else if (isHexToRawFunctionCall(str)) {
data = RAW.hexString2Bytes(getHexToRawHexString(str));
}
}
else if (data instanceof BlobChunkList) {
if (!lobEnabled) {
if (column.isOptional()) {
return null;
}
data = NumberConversions.BYTE_ZERO;
}
else {
data = convertBlobChunkList((BlobChunkList) data);
}
}
else if (data instanceof Blob) {
if (!lobEnabled) {
if (column.isOptional()) {
@ -351,21 +340,6 @@ else if (data instanceof Blob) {
}
}
private byte[] convertBlobChunkList(BlobChunkList chunks) throws SQLException {
if (chunks.isEmpty()) {
// if there are no chunks, simply return null
return null;
}
// Iterate each chunk's hex-string and combine them together.
final StringBuilder hexString = new StringBuilder();
for (String chunk : chunks) {
hexString.append(getHexToRawHexString(chunk));
}
return RAW.hexString2Bytes(hexString.toString());
}
@Override
protected Object convertInteger(Column column, Field fieldDefn, Object data) {
if (data instanceof NUMBER) {

View File

@ -18,24 +18,40 @@
public class LobWriteEvent extends LogMinerEvent {
private final String data;
private final int offset;
private final int length;
public LobWriteEvent(LogMinerEventRow row, String data) {
public LobWriteEvent(LogMinerEventRow row, String data, int offset, int length) {
super(row);
this.data = data;
this.offset = offset;
this.length = length;
}
public LobWriteEvent(EventType eventType, Scn scn, TableId tableId, String rowId, String rsId, Instant changeTime, String data) {
public LobWriteEvent(EventType eventType, Scn scn, TableId tableId, String rowId, String rsId, Instant changeTime, String data, int offset, int length) {
super(eventType, scn, tableId, rowId, rsId, changeTime);
this.data = data;
this.offset = offset;
this.length = length;
}
public String getData() {
return data;
}
public int getOffset() {
return offset;
}
public int getLength() {
return length;
}
@Override
public String toString() {
return "LobWriteEvent{" +
"offset=" + offset + ", " +
"length=" + length + ", " +
"data='" + data + '\'' +
"} " + super.toString();
}

View File

@ -13,6 +13,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -626,7 +628,10 @@ protected void handleLobWrite(LogMinerEventRow row) {
}
if (row.getRedoSql() != null) {
addToTransaction(row.getTransactionId(), row, () -> new LobWriteEvent(row, parseLobWriteSql(row.getRedoSql())));
addToTransaction(row.getTransactionId(), row, () -> {
final ParsedLobWriteSql parsed = parseLobWriteSql(row.getRedoSql());
return new LobWriteEvent(row, parsed.data, parsed.offset, parsed.length);
});
}
}
@ -898,6 +903,9 @@ private LogMinerDmlEntry parseDmlStatement(String redoSql, Table table, String t
return dmlEntry;
}
private static Pattern LOB_WRITE_SQL_PATTERN = Pattern.compile(
"(?s).* := ((?:HEXTORAW\\()?'.*'(?:\\))?);\\s*dbms_lob.write\\([^,]+,\\s*(\\d+)\\s*,\\s*(\\d+)\\s*,[^,]+\\);.*");
/**
* Parses a {@code LOB_WRITE} operation SQL fragment.
*
@ -905,26 +913,36 @@ private LogMinerDmlEntry parseDmlStatement(String redoSql, Table table, String t
* @return the parsed statement
* @throws DebeziumException if an unexpected SQL fragment is provided that cannot be parsed
*/
private String parseLobWriteSql(String sql) {
private ParsedLobWriteSql parseLobWriteSql(String sql) {
if (sql == null) {
return null;
}
int start = sql.indexOf(":= '");
if (start != -1) {
// LOB_WRITE SQL is for a CLOB field
int end = sql.lastIndexOf("'");
return sql.substring(start + 4, end);
Matcher m = LOB_WRITE_SQL_PATTERN.matcher(sql.trim());
if (!m.matches()) {
throw new DebeziumException("Unable to parse unsupported LOB_WRITE SQL: " + sql);
}
start = sql.indexOf(":= HEXTORAW");
if (start != -1) {
// LOB_WRITE SQL is for a BLOB field
int end = sql.lastIndexOf("'") + 2;
return sql.substring(start + 3, end);
String data = m.group(1);
if (data.startsWith("'")) {
// string data; drop the quotes
data = data.substring(1, data.length() - 1);
}
int length = Integer.parseInt(m.group(2));
int offset = Integer.parseInt(m.group(3)) - 1; // Oracle uses 1-based offsets
return new ParsedLobWriteSql(offset, length, data);
}
throw new DebeziumException("Unable to parse unsupported LOB_WRITE SQL: " + sql);
private class ParsedLobWriteSql {
final int offset;
final int length;
final String data;
ParsedLobWriteSql(int _offset, int _length, String _data) {
offset = _offset;
length = _length;
data = _data;
}
}
/**

View File

@ -5,15 +5,21 @@
*/
package io.debezium.connector.oracle.logminer.processor;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.BlobChunkList;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleValueConverters;
@ -26,6 +32,8 @@
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.Table;
import oracle.sql.RAW;
/**
* A consumer of transaction events at commit time that is capable of inspecting the event stream,
* merging events that should be merged when LOB support is enabled, and then delegating the final
@ -66,38 +74,36 @@ public class TransactionCommitConsumer implements AutoCloseable, BlockingConsume
private final BlockingConsumer<LogMinerEvent> delegate;
private final OracleConnectorConfig connectorConfig;
private final OracleDatabaseSchema schema;
private final List<String> lobWriteData;
/**
* Describes the current LOB event buffering state, whether we're working on a series of
* {@code LOB_WRITE} events, {@code LOB_ERASE} events, or any other type of event that
* does not require special handling.
*/
enum LobState {
WRITE,
ERASE,
OTHER
};
private class RowState {
final DmlEvent event;
final int transactionIndex;
private LogMinerEvent lastEvent;
private SelectLobLocatorEvent lastSelectLobLocatorEvent;
private LobState lobState;
RowState(final DmlEvent event, final int transactionIndex) {
this.event = event;
this.transactionIndex = transactionIndex;
}
}
private final Map<String, RowState> rows = new HashMap<>();
private String currentLobRowId;
private String currentLobColumnName;
private int currentLobColumnPosition = -1;
public TransactionCommitConsumer(BlockingConsumer<LogMinerEvent> delegate, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
this.delegate = delegate;
this.lobState = LobState.OTHER;
this.lobWriteData = new ArrayList<>();
this.connectorConfig = connectorConfig;
this.schema = schema;
}
@Override
public void close() throws InterruptedException {
if (lastEvent != null) {
if (!lobWriteData.isEmpty()) {
mergeLobWriteData(lastEvent);
}
dispatchChangeEvent(lastEvent);
// dispatch the remaining events in the order we received them from LogMiner
List<RowState> pending = new ArrayList<RowState>(rows.values());
Collections.sort(pending, (a, b) -> a.transactionIndex - b.transactionIndex);
for (final RowState rowState : pending) {
prepareAndDispatch(rowState.event);
}
}
@ -109,32 +115,141 @@ public void accept(LogMinerEvent event) throws InterruptedException {
return;
}
if (lastEvent == null) {
// Always cache first event, follow-up events will dictate merge/dispatch status
this.lastEvent = event;
if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) {
this.lastSelectLobLocatorEvent = (SelectLobLocatorEvent) event;
}
if (event instanceof DmlEvent) {
acceptDmlEvent((DmlEvent) event);
}
else {
acceptLobManipulationEvent(event);
}
}
// Check whether the LOB data queue needs to be drained to the last event
LobState currentLobState = resolveLobStateByCurrentEvent(event);
if (currentLobState != this.lobState) {
if (this.lobState == LobState.WRITE) {
mergeLobWriteData(lastEvent);
private int transactionIndex = 0;
private void acceptDmlEvent(DmlEvent event) throws InterruptedException {
transactionIndex++;
final Table table = schema.tableFor(event.getTableId());
if (table == null) {
LOGGER.trace("Unable to locate table '{}' schema, ignoring event.", event.getTableId());
return;
}
String rowId = rowIdFromEvent(table, event);
RowState rowState = rows.get(rowId);
DmlEvent accumulatorEvent = (null == rowState) ? null : rowState.event;
if (!tryMerge(accumulatorEvent, event)) {
prepareAndDispatch(accumulatorEvent);
if (rowId.equals(currentLobRowId)) {
currentLobRowId = null;
currentLobColumnName = null;
}
rows.put(rowId, new RowState(event, transactionIndex));
accumulatorEvent = event;
}
if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) {
currentLobRowId = rowId;
currentLobColumnName = ((SelectLobLocatorEvent) event).getColumnName();
currentLobColumnPosition = LogMinerHelper.getColumnIndexByName(currentLobColumnName, table);
// put a LobUnderConstruction in the accumulating event's newValues
Object[] values = newValues(accumulatorEvent);
Object prevValue = values[currentLobColumnPosition];
values[currentLobColumnPosition] = LobUnderConstruction.fromInitialValue(prevValue);
}
}
private void acceptLobManipulationEvent(LogMinerEvent event) {
if (null == currentLobRowId || null == currentLobColumnName) {
// should only happen when we start streaming in the middle of a LOB transaction (DBZ-4367)
LOGGER.trace("Got LOB manipulation event without preceding LOB selector; ignoring {} {}.", event.getEventType(), event);
return;
}
if (EventType.LOB_WRITE != event.getEventType()) {
LOGGER.warn("\t{} for table '{}' column '{}' is not supported.", event.getEventType(), event.getTableId(), currentLobColumnName);
LOGGER.trace("All LOB manipulation events apart from LOB_WRITE are currently ignored; ignoring {} {}.", event.getEventType(), event);
return;
}
LobUnderConstruction lob = (LobUnderConstruction) newValues(rows.get(currentLobRowId).event)[currentLobColumnPosition];
try {
lob.add(new LobFragment(event));
}
catch (final DebeziumException exception) {
LOGGER.warn("\tInvalid LOB manipulation event: {} ; ignoring {} {}", exception, event.getEventType(), event);
}
}
private void prepareAndDispatch(DmlEvent event) throws InterruptedException {
if (null == event) { // we just added the first event for this row
return;
}
Object[] values = newValues(event);
for (int i = 0; i < values.length; i++) {
if (values[i] instanceof LobUnderConstruction) {
values[i] = ((LobUnderConstruction) values[i]).merge();
}
}
// don't emit change events for ignored LOB manipulations (i.e. event is SEL_LOB_LOCATOR
// and oldValues is equal to newValues)
if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) {
boolean noop = true;
Object[] oldValues = oldValues(event);
for (int i = 0; i < values.length; i++) {
if (!Objects.equals(oldValues[i], values[i])) {
noop = false;
break;
}
this.lobState = currentLobState;
}
if (noop) {
LOGGER.trace("\tSkip emitting event {} {} because it's effectively a NOOP.", event.getEventType(), event);
return;
}
}
dispatchChangeEvent(event);
}
if (!isMerged(event, lastEvent)) {
LOGGER.trace("\tMerged skipped.");
// Events were not merged, dispatch last one and cache new
dispatchChangeEvent(lastEvent);
this.lastEvent = event;
}
else {
LOGGER.trace("\tMerged successfully.");
private boolean tryMerge(DmlEvent prev, DmlEvent next) {
if (prev == null) { // first event for this row.
return false;
}
// we can only merge into INSERT, UPDATE and SEL_LOB_LOCATOR
// we can only merge from UPDATE and SEL_LOB_LOCATOR
// merges _from_ SEL_LOB_LOCATOR are basically noops.
// merges _from_ UPDATE mean we have to override the specified values.
boolean merge = false;
switch (prev.getEventType()) {
case INSERT:
case UPDATE:
case SELECT_LOB_LOCATOR:
switch (next.getEventType()) {
case SELECT_LOB_LOCATOR:
merge = true;
break;
case UPDATE:
mergeEvents(prev, next);
merge = true;
break;
default:
}
default:
}
if (merge) {
LOGGER.trace("\tMerging {} event into previous {} event.", next.getEventType(), prev.getEventType());
}
return merge;
}
private void mergeEvents(DmlEvent into, DmlEvent from) {
Object[] intoVals = newValues(into);
Object[] fromVals = newValues(from);
for (int i = 0; i < intoVals.length; i++) {
if (fromVals[i] != null && !OracleValueConverters.UNAVAILABLE_VALUE.equals(fromVals[i])) {
LOGGER.trace("\t\tMerge column {}: replacing {} with {}.", i, intoVals[i], fromVals[i]);
intoVals[i] = fromVals[i];
}
}
}
@ -144,198 +259,346 @@ private void dispatchChangeEvent(LogMinerEvent event) throws InterruptedExceptio
delegate.accept(event);
}
private LobState resolveLobStateByCurrentEvent(LogMinerEvent event) {
switch (event.getEventType()) {
case LOB_WRITE:
return LobState.WRITE;
case LOB_ERASE:
return LobState.ERASE;
default:
return LobState.OTHER;
}
}
private String rowIdFromEvent(Table table, DmlEvent event) {
List<String> idParts = new ArrayList<>();
idParts.add(event.getTableId().toString());
private boolean isMerged(LogMinerEvent event, LogMinerEvent prevEvent) {
LOGGER.trace("\tVerifying merge eligibility for event {} with {}", event.getEventType(), prevEvent.getEventType());
if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) {
SelectLobLocatorEvent locatorEvent = (SelectLobLocatorEvent) event;
this.lastSelectLobLocatorEvent = locatorEvent;
if (EventType.INSERT == prevEvent.getEventType()) {
// Previous event is an INSERT
// Only merge the SEL_LOB_LOCATOR event if the previous INSERT is for the same table/row
// and if the INSERT's column value is either EMPTY_CLOB() or EMPTY_BLOB()
if (isForSameTableOrScn(event, prevEvent)) {
LOGGER.trace("\tMerging SEL_LOB_LOCATOR with previous INSERT event");
return true;
}
}
else if (EventType.UPDATE == prevEvent.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tUpdating SEL_LOB_LOCATOR column '{}' to previous UPDATE event", locatorEvent.getColumnName());
return true;
}
}
else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tAdding column '{}' to previous SEL_LOB_LOCATOR event", locatorEvent.getColumnName());
return true;
}
}
}
else if (EventType.LOB_WRITE == event.getEventType()) {
final LobWriteEvent writeEvent = (LobWriteEvent) event;
if (lastSelectLobLocatorEvent.isBinary()) {
if (!writeEvent.getData().startsWith("HEXTORAW('") && !writeEvent.getData().endsWith("')")) {
throw new DebeziumException("Unexpected LOB data chunk: " + writeEvent.getData());
}
}
LOGGER.trace("\tAdded LOB_WRITE data to internal LOB queue.");
lobWriteData.add(writeEvent.getData());
return true;
}
else if (EventType.LOB_ERASE == event.getEventType()) {
// nothing is done with the event, its just consumed and treated as merged.
LOGGER.warn("\tLOB_ERASE for table '{}' column '{}' is not supported.",
lastSelectLobLocatorEvent.getTableId(), lastSelectLobLocatorEvent.getColumnName());
if (lastEvent != null && EventType.SELECT_LOB_LOCATOR == lastEvent.getEventType()) {
LOGGER.trace("\tSkipped LOB_ERASE, discarding it and the prior SELECT_LOB_LOCATOR");
lastEvent = null;
return true;
}
LOGGER.trace("\tSkipped LOB_ERASE, treated as merged.");
return true;
}
else if (EventType.LOB_TRIM == event.getEventType()) {
// nothing is done with the event, its just consumed and treated as merged.
LOGGER.trace("\tSkipped LOB_TRIM, treated as merged.");
return true;
}
else if (EventType.INSERT == event.getEventType() || EventType.UPDATE == event.getEventType()) {
// Previous event is an INSERT
// The only valid combination here would be if the current event is an UPDATE since an INSERT
// cannot be merged with a prior INSERT.
if (EventType.INSERT == prevEvent.getEventType()) {
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous INSERT event");
mergeNewColumns((DmlEvent) event, (DmlEvent) prevEvent);
return true;
}
}
}
else if (EventType.UPDATE == prevEvent.getEventType()) {
// Previous event is an UPDATE
// This will happen if there are non LOB and inline LOB fields updated in the same SQL.
// The inline LOB values should be merged with the previous UPDATE event
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous UPDATE event");
mergeNewColumns((DmlEvent) event, (DmlEvent) prevEvent);
return true;
}
}
}
else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) {
// Previous event is a SEL_LOB_LOCATOR
// SQL contains both non-inline LOB and inline-LOB field changes
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
for (int i = 0; i < ((DmlEvent) event).getDmlEntry().getNewValues().length; ++i) {
Object value = ((DmlEvent) event).getDmlEntry().getNewValues()[i];
Object prevValue = ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[i];
if (prevValue == null && value != null) {
LOGGER.trace("\tAdding column index {} to previous SEL_LOB_LOCATOR event", i);
((DmlEvent) prevEvent).getDmlEntry().getNewValues()[i] = value;
}
}
return true;
}
}
}
}
Object[] values = (EventType.DELETE == event.getEventType()) ? oldValues(event) : newValues(event);
LOGGER.trace("\tEvent {} is for a different row, merge skipped.", event.getEventType());
return false;
}
private void mergeLobWriteData(LogMinerEvent event) {
final Object data;
if (this.lastSelectLobLocatorEvent.isBinary()) {
// For BLOB we pass the list of chunks as-is to the value converter
data = new BlobChunkList(lobWriteData);
}
else {
// For CLOB we go ahead and pre-process the list into a single string
data = String.join("", lobWriteData);
}
final DmlEvent dmlEvent = (DmlEvent) event;
final String columnName = lastSelectLobLocatorEvent.getColumnName();
final int columnIndex = getSelectLobLocatorColumnIndex();
LOGGER.trace("\tSet LOB data for column '{}' on table {} in event {}", columnName, event.getTableId(), event.getEventType());
dmlEvent.getDmlEntry().getNewValues()[columnIndex] = data;
lobWriteData.clear();
}
private int getSelectLobLocatorColumnIndex() {
final String columnName = lastSelectLobLocatorEvent.getColumnName();
return LogMinerHelper.getColumnIndexByName(columnName, schema.tableFor(lastSelectLobLocatorEvent.getTableId()));
}
private boolean isForSameTableOrScn(LogMinerEvent event, LogMinerEvent prevEvent) {
if (prevEvent != null) {
if (event.getTableId().equals(prevEvent.getTableId())) {
return true;
}
return event.getScn().equals(prevEvent.getScn()) && event.getRsId().equals(prevEvent.getRsId());
}
return false;
}
private boolean isSameTableRow(LogMinerEvent event, LogMinerEvent prevEvent) {
final Table table = schema.tableFor(event.getTableId());
if (table == null) {
LOGGER.trace("Unable to locate table '{}' schema, unable to detect if same row.", event.getTableId());
return false;
}
for (String columnName : table.primaryKeyColumnNames()) {
int position = LogMinerHelper.getColumnIndexByName(columnName, table);
Object prevValue = ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[position];
if (prevValue == null) {
throw new DebeziumException("Could not find column " + columnName + " in previous event");
if (position >= values.length) {
throw new DebeziumException("Field values corrupt for " + event.getEventType() + " " + event);
}
Object value = ((DmlEvent) event).getDmlEntry().getNewValues()[position];
Object value = values[position];
if (value == null) {
throw new DebeziumException("Could not find column " + columnName + " in event");
}
if (!Objects.equals(value, prevValue)) {
return false;
}
idParts.add(value.toString());
}
return true;
return String.join("|", idParts);
}
private void mergeNewColumns(DmlEvent event, DmlEvent prevEvent) {
final boolean prevEventIsInsert = EventType.INSERT == prevEvent.getEventType();
for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) {
Object value = event.getDmlEntry().getNewValues()[i];
Object prevValue = prevEvent.getDmlEntry().getNewValues()[i];
if (prevEventIsInsert && "EMPTY_CLOB()".equals(prevValue)) {
LOGGER.trace("\tAssigning column index {} with updated CLOB value.", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
private Object[] newValues(DmlEvent event) {
return event.getDmlEntry().getNewValues();
}
private Object[] oldValues(DmlEvent event) {
return event.getDmlEntry().getOldValues();
}
static class LobFragment {
boolean binary;
String data;
byte[] bytes;
int offset;
LobFragment(final LogMinerEvent event) {
if (EventType.LOB_WRITE != event.getEventType()) {
throw new IllegalArgumentException("can only construct LobFragments from LOB_WRITE events");
}
else if (prevEventIsInsert && "EMPTY_BLOB()".equals(prevValue)) {
LOGGER.trace("\tAssigning column index {} with updated BLOB value.", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
final LobWriteEvent writeEvent = (LobWriteEvent) event;
initializeFromData(writeEvent.getData());
this.offset = writeEvent.getOffset();
// DBMS_LOB.WRITE rules:
// length (from the writeEvent) may not be larger than buffer length, but it may be shorter. We don't expect
// that to happen in the LogMiner events, but it doesn't hurt to check.
final int eventLength = writeEvent.getLength();
if (eventLength < length()) {
truncate(eventLength);
}
else if (!prevEventIsInsert && OracleValueConverters.UNAVAILABLE_VALUE.equals(value)) {
LOGGER.trace("\tSkipped column index {} with unavailable column value.", i);
}
LobFragment(final String value) {
initializeFromData(value);
this.offset = 0;
}
private void initializeFromData(String data) {
this.binary = data.startsWith("HEXTORAW('") && data.endsWith("')");
if (this.binary) {
try {
this.bytes = RAW.hexString2Bytes(data.substring(10, data.length() - 2));
}
catch (SQLException e) {
throw new DebeziumException("malformed hex string in LogMiner event BLOB value", e);
}
}
else if (!prevEventIsInsert && value != null) {
LOGGER.trace("\tUpdating column index {} in previous event", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
else {
this.data = data;
}
}
int length() {
return binary ? bytes.length : data.length();
}
int end() {
return offset + length();
}
void truncate(int newLength) {
if (newLength > length()) {
throw new DebeziumException("cannot truncate LOB fragment from length " + length() + " to length " + newLength);
}
if (binary) {
bytes = Arrays.copyOf(bytes, newLength);
}
else {
data = data.substring(0, newLength);
}
}
void frontTruncate(int newLength) {
if (newLength > length()) {
throw new DebeziumException("cannot front-truncate LOB fragment from length " + length() + " to length " + newLength);
}
if (binary) {
bytes = Arrays.copyOfRange(bytes, bytes.length - newLength, bytes.length);
}
else {
data = data.substring(data.length() - newLength);
}
offset += length() - newLength;
}
void absorb(LobFragment other) {
if (other.offset < offset || other.end() > end()) {
throw new DebeziumException(
"cannot absorb fragment (" + other.offset + ", " + other.end() + ") into fragment " +
"(" + offset + ", " + end() + ") because the absorbee does not fully overlap the absorber");
}
int prefixEnd = other.offset - offset;
int suffixStart = other.end() - offset;
if (binary) {
System.arraycopy(other.bytes, 0, bytes, prefixEnd, other.bytes.length);
}
else {
data = data.substring(0, prefixEnd) + other.data + data.substring(suffixStart);
}
}
void append(LobFragment other) {
if (other.offset < end()) {
throw new DebeziumException("cannot append fragment: offset " + other.offset + " is before this " +
"fragment's end " + end());
}
if (binary) {
bytes = Arrays.copyOf(bytes, other.end() - offset); // pads with zeroes
System.arraycopy(other.bytes, 0, bytes, other.offset - offset, other.bytes.length);
}
else {
int gap = other.offset - end();
if (gap > 0) {
data = data + spaces(gap) + other.data;
}
else {
data = data + other.data;
}
}
}
static String spaces(int length) {
char[] backing = new char[length];
for (int i = 0; i < backing.length; i++) {
backing[i] = ' ';
}
return new String(backing);
}
}
static class LobUnderConstruction {
final List<LobFragment> fragments = new LinkedList<>();
int start = 0;
int end = 0;
boolean binary = false;
boolean isNull = true; // result of #merge() should be null (for instances that are never written to)
int middleInserts = 0;
void add(LobFragment fragment) {
isNull = false;
if (fragments.isEmpty()) { // first fragment to be added
fragments.add(fragment);
start = fragment.offset;
end = fragment.end();
binary = fragment.binary;
return;
}
if (fragment.binary != binary) {
throw new DebeziumException("mixing binary and non-binary writes in a single LOB");
}
if (fragment.offset >= end) { // the expected case
fragments.add(fragment);
end = fragment.end();
return;
}
// the uncommon case: writing somewhere in the middle
middleInserts++;
if (middleInserts % 10 == 0) {
compact(); // try to keep the linear search time within reasonable bounds
}
// find the right spot to insert
ListIterator<LobFragment> iter = fragments.listIterator();
while (iter.hasNext()) {
LobFragment frag = iter.next();
if (fragment.offset < frag.end() && fragment.offset >= frag.offset) {
if (fragment.end() >= frag.end()) { // fragment partially overlaps frag
// truncate frag and insert after
frag.truncate(fragment.offset - frag.offset);
iter.add(fragment);
}
else { // fragment overlaps frag entirely
frag.absorb(fragment);
}
break;
}
if (frag.offset > fragment.offset) {
// insert before; no need to truncate preceding fragment
iter.previous();
iter.add(fragment);
break;
}
}
// are there any following fragments that are (partially) overwritten by the fragment we're inserting?
while (iter.hasNext()) {
LobFragment frag = iter.next();
if (frag.offset >= fragment.end()) { // we're done
break;
}
if (frag.end() <= fragment.end()) { // remove entirely
iter.remove();
}
else { // front-truncate
frag.frontTruncate(frag.end() - fragment.end());
}
}
// adjust start and end bookkeeping as necessary
if (fragment.offset < start) {
start = fragment.offset;
}
if (fragment.end() > end) {
end = fragment.end();
}
}
void compact() {
ListIterator<LobFragment> iter = fragments.listIterator();
if (!iter.hasNext()) {
return;
}
LobFragment prev = iter.next();
while (iter.hasNext()) {
LobFragment frag = iter.next();
if (frag.offset - prev.end() < 128) {
prev.append(frag);
iter.remove();
}
else {
prev = frag;
}
}
}
/**
* Merges all LOB fragments.
*
* Returns:
* - null if the isNull flag is set
* - "EMPTY_BLOB()" or "EMPTY_CLOB()" the lob is empty, but isNull is not set
* - a single String for (N)CLOB
* - a single byte[] from BLOB
* Any holes will be filled with spaces (CLOB) or zero bytes (BLOB) as per the specification of DBMS_LOB.WRITE.
*/
Object merge() {
if (isNull) {
return null;
}
if (end == 0 && binary) {
return "EMPTY_BLOB()";
}
if (end == 0 && !binary) {
return "EMPTY_CLOB()";
}
if (binary) {
byte[] buffer = new byte[end];
ListIterator<LobFragment> iter = fragments.listIterator();
while (iter.hasNext()) {
LobFragment frag = iter.next();
System.arraycopy(frag.bytes, 0, buffer, frag.offset, frag.bytes.length);
}
return buffer;
}
else {
StringBuilder builder = new StringBuilder();
int offset = 0;
ListIterator<LobFragment> iter = fragments.listIterator();
while (iter.hasNext()) {
LobFragment frag = iter.next();
if (offset < frag.offset) { // fill the holes between fragments
builder.append(LobFragment.spaces(frag.offset - offset));
}
if (frag.length() == 0) { // may happen in rare corner cases
continue;
}
builder.append(frag.data);
offset = frag.end();
}
return builder.toString();
}
}
public String toString() {
return "LobUnderConstruction{" +
"binary = " + binary +
", start = " + start +
", end = " + end +
", #fragments = " + fragments.size() +
"}";
}
// Creates a LobUnderConstruction instance from the initial value stored in the
// parent event's column.
static LobUnderConstruction fromInitialValue(Object value) {
if (null == value) {
return new LobUnderConstruction();
}
if (value instanceof LobUnderConstruction) {
return (LobUnderConstruction) value;
}
if (value instanceof String) {
String strval = (String) value;
LobUnderConstruction lob = new LobUnderConstruction();
if (strval.equals("EMPTY_BLOB()")) {
lob.binary = true;
lob.isNull = false; // lob must be emitted
}
else if (strval.equals("EMPTY_CLOB()")) {
lob.binary = false;
lob.isNull = false; // lob must be emitted
}
else {
lob.add(new LobFragment(strval));
}
return lob;
}
LOGGER.trace("Don't know how to construct an initial LOB value from {}.", value);
return new LobUnderConstruction();
}
}
}

View File

@ -46,11 +46,13 @@ public class LobWriteEventAdapter extends LogMinerEventAdapter {
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @param data the LOB data
* @param offset the LOB data offset (0-based)
* @param length the LOB data length
* @return the constructed DmlEvent
*/
@ProtoFactory
public LobWriteEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, String data) {
return new LobWriteEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), data);
public LobWriteEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, String data, int offset, int length) {
return new LobWriteEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), data, offset, length);
}
/**
@ -63,4 +65,26 @@ public LobWriteEvent factory(int eventType, String scn, String tableId, String r
public String getData(LobWriteEvent event) {
return event.getData();
}
/**
* A ProtoStream handler to extract the {@code offset} field from a {@link LobWriteEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the offset of the data to be written for a LOB field
*/
@ProtoField(number = 8, required = true)
public int getOffset(LobWriteEvent event) {
return event.getOffset();
}
/**
* A ProtoStream handler to extract the {@code length} field from a {@link LobWriteEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the length of the data to be written for a LOB field
*/
@ProtoField(number = 9, required = true)
public int getLength(LobWriteEvent event) {
return event.getLength();
}
}

View File

@ -1443,6 +1443,242 @@ record = records.recordsForTopic(topicName("DBZ4276")).get(0);
}
}
@Test
@FixFor("DBZ-4366")
public void shouldStreamBlobsWrittenInChunkedMode() throws Exception {
TestHelper.dropTable(connection, "dbz4366");
try {
connection.execute("CREATE TABLE dbz4366 (id numeric(9,0), data blob not null, primary key(id))");
TestHelper.streamTable(connection, "dbz4366");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4366")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.executeWithoutCommitting("INSERT INTO dbz4366 (id,data) values (1,EMPTY_BLOB())");
final String fillQuery = "DECLARE\n" +
" loc BLOB;\n" +
" i PLS_INTEGER;\n" +
"BEGIN\n" +
" SELECT data into loc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc, DBMS_LOB.LOB_READWRITE);\n" +
" FOR i IN 1..1024 LOOP\n" +
" DBMS_LOB.WRITEAPPEND(loc, 1024, ?);\n" +
" END LOOP;\n" +
" DBMS_LOB.CLOSE(loc);\n" +
"END;";
connection.prepareQuery(fillQuery, ps -> ps.setBytes(1, part(BIN_DATA, 0, 1024)), null);
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DBZ4366"))).hasSize(1);
SourceRecord record = records.recordsForTopic(topicName("DBZ4366")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
ByteBuffer data = (ByteBuffer) after.get("DATA");
assertThat(data.array().length).isEqualTo(1024 * 1024);
// As a sanity check, there should be no more records.
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4366");
}
}
@Test
@FixFor("DBZ-4366")
public void shouldStreamBlobsWrittenInInterleavedChunkedMode() throws Exception {
TestHelper.dropTable(connection, "dbz4366");
try {
connection.execute("CREATE TABLE dbz4366 (id numeric(9,0), data blob not null, data2 blob not null, primary key(id))");
TestHelper.streamTable(connection, "dbz4366");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4366")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.executeWithoutCommitting("INSERT INTO dbz4366 (id,data,data2) values (1,EMPTY_BLOB(),EMPTY_BLOB())");
final String fillQuery = "DECLARE\n" +
" loc BLOB;\n" +
" loc2 BLOB;\n" +
" i PLS_INTEGER;\n" +
"BEGIN\n" +
" FOR i IN 1..1024 LOOP\n" +
" SELECT data into loc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc, DBMS_LOB.LOB_READWRITE);\n" +
" DBMS_LOB.WRITEAPPEND(loc, 1024, ?);\n" +
" DBMS_LOB.CLOSE(loc);\n" +
" \n" +
" SELECT data2 into loc2 FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc2, DBMS_LOB.LOB_READWRITE);\n" +
" DBMS_LOB.WRITEAPPEND(loc2, 1024, ?);\n" +
" DBMS_LOB.CLOSE(loc2);\n" +
" END LOOP;\n" +
"END;";
connection.prepareQuery(fillQuery, ps -> {
ps.setBytes(1, part(BIN_DATA, 0, 1024));
ps.setBytes(2, part(BIN_DATA, 0, 1024));
}, null);
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DBZ4366"))).hasSize(1);
SourceRecord record = records.recordsForTopic(topicName("DBZ4366")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
ByteBuffer data = (ByteBuffer) after.get("DATA");
assertThat(data.array().length).isEqualTo(1024 * 1024);
ByteBuffer data2 = (ByteBuffer) after.get("DATA2");
assertThat(data2.array().length).isEqualTo(1024 * 1024);
// As a sanity check, there should be no more records.
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4366");
}
}
@Test
@FixFor("DBZ-4366")
public void shouldStreamBlobsWrittenInInterleavedChunkedMode2() throws Exception {
TestHelper.dropTable(connection, "dbz4366");
try {
connection.execute("CREATE TABLE dbz4366 (id numeric(9,0), data blob not null, data2 blob not null, primary key(id))");
TestHelper.streamTable(connection, "dbz4366");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4366")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.executeWithoutCommitting("INSERT INTO dbz4366 (id,data,data2) values (1,EMPTY_BLOB(),EMPTY_BLOB())");
final String fillQuery = "DECLARE\n" +
" loc BLOB;\n" +
" loc2 BLOB;\n" +
" i PLS_INTEGER;\n" +
"BEGIN\n" +
" SELECT data into loc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc, DBMS_LOB.LOB_READWRITE);\n" +
" SELECT data2 into loc2 FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc2, DBMS_LOB.LOB_READWRITE);\n" +
" FOR i IN 1..1024 LOOP\n" +
" DBMS_LOB.WRITEAPPEND(loc, 1024, ?);\n" +
" DBMS_LOB.WRITEAPPEND(loc2, 1024, ?);\n" +
" END LOOP;\n" +
" DBMS_LOB.CLOSE(loc);\n" +
" DBMS_LOB.CLOSE(loc2);\n" +
"END;";
connection.prepareQuery(fillQuery, ps -> {
ps.setBytes(1, part(BIN_DATA, 0, 1024));
ps.setBytes(2, part(BIN_DATA, 0, 1024));
}, null);
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DBZ4366"))).hasSize(1);
SourceRecord record = records.recordsForTopic(topicName("DBZ4366")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
ByteBuffer data = (ByteBuffer) after.get("DATA");
assertThat(data.array().length).isEqualTo(1024 * 1024);
ByteBuffer data2 = (ByteBuffer) after.get("DATA2");
assertThat(data2.array().length).isEqualTo(1024 * 1024);
// As a sanity check, there should be no more records.
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4366");
}
}
@Test
@FixFor("DBZ-4366")
public void shouldStreamBlobsWrittenInInterleavedChunkedMode3() throws Exception {
TestHelper.dropTable(connection, "dbz4366");
try {
connection.execute("CREATE TABLE dbz4366 (id numeric(9,0), data blob not null, primary key(id))");
TestHelper.streamTable(connection, "dbz4366");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4366")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.executeWithoutCommitting("INSERT INTO dbz4366 (id,data) values (1,EMPTY_BLOB())");
connection.executeWithoutCommitting("INSERT INTO dbz4366 (id,data) values (2,EMPTY_BLOB())");
final String fillQuery = "DECLARE\n" +
" loc BLOB;\n" +
" loc2 BLOB;\n" +
" i PLS_INTEGER;\n" +
"BEGIN\n" +
" SELECT data into loc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc, DBMS_LOB.LOB_READWRITE);\n" +
" SELECT data into loc2 FROM dbz4366 WHERE id = 2 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc2, DBMS_LOB.LOB_READWRITE);\n" +
" FOR i IN 1..1024 LOOP\n" +
" DBMS_LOB.WRITEAPPEND(loc, 1024, ?);\n" +
" DBMS_LOB.WRITEAPPEND(loc2, 1024, ?);\n" +
" END LOOP;\n" +
" DBMS_LOB.CLOSE(loc);\n" +
" DBMS_LOB.CLOSE(loc2);\n" +
"END;";
connection.prepareQuery(fillQuery, ps -> {
ps.setBytes(1, part(BIN_DATA, 0, 1024));
ps.setBytes(2, part(BIN_DATA, 0, 1024));
}, null);
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(topicName("DBZ4366"))).hasSize(2);
SourceRecord record = records.recordsForTopic(topicName("DBZ4366")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
ByteBuffer data = (ByteBuffer) after.get("DATA");
assertThat(data.array().length).isEqualTo(1024 * 1024);
record = records.recordsForTopic(topicName("DBZ4366")).get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
data = (ByteBuffer) after.get("DATA");
assertThat(data.array().length).isEqualTo(1024 * 1024);
// As a sanity check, there should be no more records.
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4366");
}
}
private static byte[] part(byte[] buffer, int start, int length) {
return Arrays.copyOfRange(buffer, start, length);
}

View File

@ -1840,6 +1840,66 @@ record = records.recordsForTopic(topicName("DBZ4276")).get(0);
}
}
@Test
@FixFor("DBZ-4366")
public void shouldStreamClobsWrittenInChunkedMode() throws Exception {
TestHelper.dropTable(connection, "dbz4366");
try {
connection.execute("CREATE TABLE dbz4366 (id numeric(9,0), val_clob clob not null, val_nclob nclob not null, primary key(id))");
TestHelper.streamTable(connection, "dbz4366");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4366")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.executeWithoutCommitting("INSERT INTO dbz4366 (id,val_clob,val_nclob) values (1,EMPTY_CLOB(),EMPTY_CLOB())");
// for bonus points, interleave the writes to the LOB fields
final String fillQuery = "DECLARE\n" +
" loc CLOB;\n" +
" nloc NCLOB;\n" +
" i PLS_INTEGER;\n" +
" str VARCHAR2(1024);\n" +
"BEGIN\n" +
" str := ?;\n" +
" SELECT val_clob into loc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" SELECT val_nclob into nloc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n" +
" DBMS_LOB.OPEN(loc, DBMS_LOB.LOB_READWRITE);\n" +
" DBMS_LOB.OPEN(nloc, DBMS_LOB.LOB_READWRITE);\n" +
" FOR i IN 1..1024 LOOP\n" +
" DBMS_LOB.WRITEAPPEND(loc, 1024, str);\n" +
" DBMS_LOB.WRITEAPPEND(nloc, 1024, str);\n" +
" END LOOP;\n" +
" DBMS_LOB.CLOSE(loc);\n" +
" DBMS_LOB.CLOSE(nloc);\n" +
"END;";
connection.prepareQuery(fillQuery, ps -> ps.setString(1, part(JSON_DATA, 0, 1024)), null);
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DBZ4366"))).hasSize(1);
SourceRecord record = records.recordsForTopic(topicName("DBZ4366")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
String clobval = (String) after.get("VAL_CLOB");
assertThat(clobval.length()).isEqualTo(1024 * 1024);
String nclobval = (String) after.get("VAL_NCLOB");
assertThat(nclobval.length()).isEqualTo(1024 * 1024);
// As a sanity check, there should be no more records.
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4366");
}
}
private Clob createClob(String data) throws SQLException {
Clob clob = connection.connection().createClob();
clob.setString(1, data);

View File

@ -0,0 +1,252 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import static io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer.LobFragment;
import static io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer.LobUnderConstruction;
import static org.fest.assertions.Assertions.assertThat;
import java.sql.SQLException;
import java.util.Random;
import org.junit.Test;
import oracle.sql.RAW;
public class LobUnderConstructionTest {
@Test
public void shouldBuildCorrectLobFromInitialValue() {
LobUnderConstruction lob = LobUnderConstruction.fromInitialValue(null);
assertThat(lob.isNull).isTrue();
assertThat(lob.fragments).isEmpty();
assertThat(lob.start).isEqualTo(0);
assertThat(lob.end).isEqualTo(0);
lob = LobUnderConstruction.fromInitialValue("EMPTY_BLOB()");
assertThat(lob.isNull).isFalse();
assertThat(lob.binary).isTrue();
assertThat(lob.fragments).isEmpty();
assertThat(lob.start).isEqualTo(0);
assertThat(lob.end).isEqualTo(0);
lob = LobUnderConstruction.fromInitialValue("EMPTY_CLOB()");
assertThat(lob.isNull).isFalse();
assertThat(lob.binary).isFalse();
assertThat(lob.fragments).isEmpty();
assertThat(lob.start).isEqualTo(0);
assertThat(lob.end).isEqualTo(0);
lob = LobUnderConstruction.fromInitialValue("0123456789");
assertThat(lob.isNull).isFalse();
assertThat(lob.binary).isFalse();
assertThat(lob.start).isEqualTo(0);
assertThat(lob.end).isEqualTo(10);
lob = LobUnderConstruction.fromInitialValue("HEXTORAW('0123456789')");
assertThat(lob.isNull).isFalse();
assertThat(lob.binary).isTrue();
assertThat(lob.start).isEqualTo(0);
assertThat(lob.end).isEqualTo(5);
LobUnderConstruction lob2 = LobUnderConstruction.fromInitialValue(lob);
assertThat(lob2 == lob).isTrue();
}
@Test
public void shouldMergeCorrectly() throws SQLException {
// 0 fragments
LobUnderConstruction lob = LobUnderConstruction.fromInitialValue(null);
assertThat(lob.merge()).isEqualTo(null);
lob = LobUnderConstruction.fromInitialValue("EMPTY_BLOB()");
assertThat(lob.merge()).isEqualTo("EMPTY_BLOB()");
lob = LobUnderConstruction.fromInitialValue("EMPTY_CLOB()");
assertThat(lob.merge()).isEqualTo("EMPTY_CLOB()");
// 1 fragment
lob = LobUnderConstruction.fromInitialValue("0123456789");
assertThat(lob.merge()).isEqualTo("0123456789");
lob = LobUnderConstruction.fromInitialValue("EMPTY_CLOB()");
lob.add(new LobFragment("0123456789"));
assertThat(lob.merge()).isEqualTo("0123456789");
lob = LobUnderConstruction.fromInitialValue("HEXTORAW('0123456789')");
assertThat(lob.merge()).isInstanceOf(byte[].class);
assertThat((byte[]) lob.merge()).isEqualTo(RAW.hexString2Bytes("0123456789"));
lob = LobUnderConstruction.fromInitialValue("EMPTY_BLOB()");
lob.add(new LobFragment("HEXTORAW('0123456789')"));
assertThat(lob.merge()).isInstanceOf(byte[].class);
assertThat((byte[]) lob.merge()).isEqualTo(RAW.hexString2Bytes("0123456789"));
// multiple fragments with holes
lob = LobUnderConstruction.fromInitialValue("0123456789");
LobFragment frag = new LobFragment("9876543210");
frag.offset = 11; // a gap of size one
lob.add(frag);
assertThat(lob.merge()).isEqualTo("0123456789 9876543210");
lob = LobUnderConstruction.fromInitialValue("HEXTORAW('0123456789')");
frag = new LobFragment("HEXTORAW('9876543210')");
frag.offset = 6; // a gap of size one
lob.add(frag);
assertThat((byte[]) lob.merge()).isEqualTo(RAW.hexString2Bytes("0123456789009876543210"));
}
private String randomString(int length) {
int lower = 'a';
int upper = 'z';
return new Random().ints(lower, upper + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
private String encodeBytes(byte[] bytes, int offset, int length) {
StringBuilder builder = new StringBuilder("HEXTORAW('");
for (int i = offset; i < offset + length; i++) {
builder.append(String.format("%02x", bytes[i]));
}
builder.append("')");
return builder.toString();
}
@Test
public void shouldHandleNonLinearInsertionOrderWithOverwritesForBlob() {
byte[] contents = new byte[1024];
new Random().nextBytes(contents);
// back to front
LobUnderConstruction lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 16; i++) {
int offset = 1024 - 16 * i - 16;
LobFragment frag = new LobFragment(encodeBytes(contents, offset, 16));
frag.offset = offset;
lob.add(frag);
}
assertThat(lob.merge()).isEqualTo(contents);
// front-back-front-back
lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 32; i++) {
int frontOffset = 16 * i;
LobFragment frontFrag = new LobFragment(encodeBytes(contents, frontOffset, 16));
frontFrag.offset = frontOffset;
int backOffset = 1024 - 16 * i - 16;
LobFragment backFrag = new LobFragment(encodeBytes(contents, backOffset, 16));
backFrag.offset = backOffset;
lob.add(frontFrag);
lob.add(backFrag);
}
assertThat(lob.merge()).isEqualTo(contents);
// make gaps, fill gaps
lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 32; i++) {
int offset = 32 * i; // even slots
LobFragment frag = new LobFragment(encodeBytes(contents, offset, 16));
frag.offset = offset;
lob.add(frag);
}
for (int i = 0; i < 1024 / 32; i++) {
int offset = 32 * i + 16; // odd slots
LobFragment frag = new LobFragment(encodeBytes(contents, offset, 16));
frag.offset = offset;
lob.add(frag);
}
assertThat(lob.merge()).isEqualTo(contents);
// all overlapping writes
lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 16 - 1; i++) {
int offset = 16 * i;
LobFragment frag = new LobFragment(encodeBytes(contents, offset, 32));
frag.offset = offset;
lob.add(frag);
}
assertThat(lob.merge()).isEqualTo(contents);
// overwrite one large fragment back to front
byte[] otherContents = new byte[1024];
new Random().nextBytes(otherContents);
lob = LobUnderConstruction.fromInitialValue(encodeBytes(otherContents, 0, otherContents.length));
for (int i = 0; i < 1024 / 16; i++) {
int offset = 1024 - 16 * i - 16;
LobFragment frag = new LobFragment(encodeBytes(contents, offset, 16));
frag.offset = offset;
lob.add(frag);
// the number of fragments should never be greater than 10, because after every 10
// off-sequence insertions we coalesce the fragments to improve performance
assertThat(lob.fragments.size()).isLessThan(11);
}
assertThat(lob.merge()).isEqualTo(contents);
}
@Test
public void shouldHandleNonLinearInsertionOrderWithOverwritesForClob() {
String contents = randomString(1024);
// back to front
LobUnderConstruction lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 16; i++) {
int offset = 1024 - 16 * i - 16;
LobFragment frag = new LobFragment(contents.substring(offset, offset + 16));
frag.offset = offset;
lob.add(frag);
}
assertThat(lob.merge()).isEqualTo(contents);
// front-back-front-back
lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 32; i++) {
int frontOffset = 16 * i;
LobFragment frontFrag = new LobFragment(contents.substring(frontOffset, frontOffset + 16));
frontFrag.offset = frontOffset;
int backOffset = 1024 - 16 * i - 16;
LobFragment backFrag = new LobFragment(contents.substring(backOffset, backOffset + 16));
backFrag.offset = backOffset;
lob.add(frontFrag);
lob.add(backFrag);
}
assertThat(lob.merge()).isEqualTo(contents);
// make gaps, fill gaps
lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 32; i++) {
int offset = 32 * i; // even slots
LobFragment frag = new LobFragment(contents.substring(offset, offset + 16));
frag.offset = offset;
lob.add(frag);
}
for (int i = 0; i < 1024 / 32; i++) {
int offset = 32 * i + 16; // odd slots
LobFragment frag = new LobFragment(contents.substring(offset, offset + 16));
frag.offset = offset;
lob.add(frag);
}
assertThat(lob.merge()).isEqualTo(contents);
// all overlapping writes
lob = new LobUnderConstruction();
for (int i = 0; i < 1024 / 16 - 1; i++) {
int offset = 16 * i;
LobFragment frag = new LobFragment(contents.substring(offset, offset + 32));
frag.offset = offset;
lob.add(frag);
}
assertThat(lob.merge()).isEqualTo(contents);
// overwrite one large fragment back to front
lob = LobUnderConstruction.fromInitialValue(randomString(1024));
for (int i = 0; i < 1024 / 16; i++) {
int offset = 1024 - 16 * i - 16;
LobFragment frag = new LobFragment(contents.substring(offset, offset + 16));
frag.offset = offset;
lob.add(frag);
// the number of fragments should never be greater than 10, because after every 10
// off-sequence insertions we coalesce the fragments to improve performance
assertThat(lob.fragments.size()).isLessThan(11);
}
assertThat(lob.merge()).isEqualTo(contents);
}
}