DBZ-2948 Implement BLOB/CLOB support for Oracle Xstream

This commit is contained in:
Chris Cranford 2021-05-07 15:03:31 -04:00 committed by Gunnar Morling
parent fb7378ef52
commit 467a7f21f6
6 changed files with 118 additions and 13 deletions

View File

@ -5,6 +5,15 @@
*/
package io.debezium.connector.oracle.xstream;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,6 +29,7 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import oracle.sql.Datum;
import oracle.streams.ChunkColumnValue;
import oracle.streams.DDLLCR;
import oracle.streams.LCR;
@ -46,6 +56,8 @@ class LcrEventHandler implements XStreamLCRCallbackHandler {
private final boolean tablenameCaseInsensitive;
private final XstreamStreamingChangeEventSource eventSource;
private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
private final Map<String, List<ChunkColumnValue>> columnChunks;
private RowLCR currentRow;
public LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock,
OracleDatabaseSchema schema, OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive,
@ -59,6 +71,7 @@ public LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler error
this.tablenameCaseInsensitive = tablenameCaseInsensitive;
this.eventSource = eventSource;
this.streamingMetrics = streamingMetrics;
this.columnChunks = new LinkedHashMap<>();
}
@Override
@ -67,6 +80,7 @@ public void processLCR(LCR lcr) throws StreamsException {
// First set watermark to flush messages seen
setWatermark();
columnChunks.clear();
final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition());
@ -91,7 +105,7 @@ public void processLCR(LCR lcr) throws StreamsException {
try {
if (lcr instanceof RowLCR) {
dispatchDataChangeEvent((RowLCR) lcr);
processRowLCR((RowLCR) lcr);
}
else if (lcr instanceof DDLLCR) {
dispatchSchemaChangeEvent((DDLLCR) lcr);
@ -108,7 +122,26 @@ else if (lcr instanceof DDLLCR) {
}
}
private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException {
private void processRowLCR(RowLCR row) throws InterruptedException {
if (row.getCommandType().equals(RowLCR.LOB_ERASE)) {
LOGGER.warn("LOB_ERASE for table '{}' is not supported, "
+ "use DML operations to manipulate LOB columns only.", row.getObjectName());
return;
}
if (row.hasChunkData()) {
// If the row has chunk data, the RowLCR cannot be immediately dispatched.
// The handler needs to cache the current row and wait for the chunks to be delivered before
// the event can be safely dispatched. See processChunk below.
currentRow = row;
}
else {
// Since the row has no chunk data, it can be dispatched immediately.
dispatchDataChangeEvent(row, Collections.emptyMap());
}
}
private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues) throws InterruptedException {
LOGGER.debug("Processing DML event {}", lcr);
if (RowLCR.COMMIT.equals(lcr.getCommandType())) {
@ -120,7 +153,7 @@ private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException {
dispatcher.dispatchDataChangeEvent(
tableId,
new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock));
new XStreamChangeRecordEmitter(offsetContext, lcr, chunkValues, schema.tableFor(tableId), clock));
}
private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedException {
@ -191,8 +224,70 @@ else if (message.scn != null) {
}
@Override
public void processChunk(ChunkColumnValue arg0) throws StreamsException {
throw new UnsupportedOperationException("Not yet implemented");
public void processChunk(ChunkColumnValue chunk) throws StreamsException {
// Store the chunk in the chunk map
// Chunks will be processed once the end of the row is reached
columnChunks.computeIfAbsent(chunk.getColumnName(), v -> new ArrayList<>()).add(chunk);
if (chunk.isEndOfRow()) {
try {
// Map of resolved chunk values
Map<String, Object> resolvedChunkValues = new HashMap<>();
// All chunks have been dispatched to the event handler, combine the chunks now.
for (Map.Entry<String, List<ChunkColumnValue>> entry : columnChunks.entrySet()) {
final String columnName = entry.getKey();
final List<ChunkColumnValue> chunkValues = entry.getValue();
if (chunkValues.isEmpty()) {
LOGGER.trace("Column '{}' has no chunk values.", columnName);
continue;
}
final int type = chunkValues.get(0).getChunkType();
switch (type) {
case ChunkColumnValue.CLOB:
case ChunkColumnValue.NCLOB:
resolvedChunkValues.put(columnName, resolveClobChunkValue(chunkValues));
break;
case ChunkColumnValue.BLOB:
resolvedChunkValues.put(columnName, resolveBlobChunkValue(chunkValues));
break;
default:
throw new DebeziumException("An unsupported chunk type '" + type + "' for column '" + columnName + "'");
}
}
columnChunks.clear();
dispatchDataChangeEvent(currentRow, resolvedChunkValues);
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.info("Received signal to stop, event loop will halt");
}
catch (SQLException e) {
throw new DebeziumException("Failed to process chunk data", e);
}
}
}
private String resolveClobChunkValue(List<ChunkColumnValue> chunkValues) throws SQLException {
StringBuilder data = new StringBuilder();
for (ChunkColumnValue chunkValue : chunkValues) {
data.append(chunkValue.getColumnData().stringValue());
}
return data.toString();
}
private byte[] resolveBlobChunkValue(List<ChunkColumnValue> chunkValues) {
long size = chunkValues.stream().map(ChunkColumnValue::getColumnData).mapToLong(Datum::getLength).sum();
ByteBuffer buffer = ByteBuffer.allocate((int) size);
for (ChunkColumnValue columnValue : chunkValues) {
buffer.put(columnValue.getColumnData().getBytes());
}
return buffer.array();
}
@Override

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.oracle.xstream;
import java.util.Map;
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
@ -22,10 +24,12 @@
public class XStreamChangeRecordEmitter extends BaseChangeRecordEmitter<ColumnValue> {
private final RowLCR lcr;
private final Map<String, Object> chunkValues;
public XStreamChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, Clock clock) {
public XStreamChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Map<String, Object> chunkValues, Table table, Clock clock) {
super(offset, table, clock);
this.lcr = lcr;
this.chunkValues = chunkValues;
}
@Override
@ -61,4 +65,17 @@ protected String getColumnName(ColumnValue columnValue) {
protected Object getColumnData(ColumnValue columnValue) {
return columnValue.getColumnData();
}
@Override
protected Object[] getColumnValues(ColumnValue[] columnValues) {
Object[] values = super.getColumnValues(columnValues);
// Overlay chunk values into non-chunk value array
for (Map.Entry<String, Object> entry : chunkValues.entrySet()) {
final int index = table.columnWithName(entry.getKey()).position() - 1;
values[index] = entry.getValue();
}
return values;
}
}

View File

@ -31,7 +31,6 @@
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VariableScaleDecimal;
@ -593,7 +592,6 @@ public void timeTypesAsConnect() throws Exception {
}
@Test
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.XSTREAM, reason = "XStream doesn't yet support CLOB data types")
public void clobTypes() throws Exception {
int expectedRecordCount = 0;

View File

@ -29,7 +29,6 @@
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
@ -44,7 +43,6 @@
*
* @author Chris Cranford
*/
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.XSTREAM, reason = "XStream does not yet support BLOB data types")
public class OracleBlobDataTypesIT extends AbstractConnectorTest {
private static final byte[] BIN_DATA = readBinaryData("data/test_lob_data.json");

View File

@ -24,7 +24,6 @@
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
@ -38,7 +37,6 @@
*
* @author Chris Cranford
*/
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.XSTREAM, reason = "XStream does not yet support BLOB data types")
public class OracleClobDataTypeIT extends AbstractConnectorTest {
private static final String JSON_DATA = Testing.Files.readResourceAsString("data/test_lob_data.json");

View File

@ -1576,7 +1576,6 @@ record = records.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
// todo: should this test be removed since its now covered in OracleClobDataTypesIT?
@Test
@FixFor("DBZ-3257")
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
public void shouldSnapshotAndStreamClobDataTypes() throws Exception {
TestHelper.dropTable(connection, "clob_test");
try {