DBZ-5295 Cache LOB column status in schema model

This commit is contained in:
Chris Cranford 2022-07-07 17:24:41 -04:00 committed by Jiri Pechanec
parent 32947ae512
commit 6735904075
7 changed files with 109 additions and 51 deletions

View File

@ -29,7 +29,6 @@
import io.debezium.relational.TableSchema;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import oracle.jdbc.OracleTypes;
/**
* Base class to emit change data based on a single entry event.
@ -39,17 +38,20 @@ public abstract class BaseChangeRecordEmitter<T> extends RelationalChangeRecordE
private static final Logger LOGGER = LoggerFactory.getLogger(BaseChangeRecordEmitter.class);
private final OracleConnectorConfig connectorConfig;
private final byte[] unavailableValuePlaceholderBinary;
private final ByteBuffer unavailableValuePlaceholderBinary;
private final String unavailableValuePlaceholderString;
private final Object[] oldColumnValues;
private final Object[] newColumnValues;
private final OracleDatabaseSchema schema;
protected final Table table;
protected BaseChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partition partition, OffsetContext offset,
Table table, Clock clock, Object[] oldColumnValues, Object[] newColumnValues) {
OracleDatabaseSchema schema, Table table, Clock clock, Object[] oldColumnValues,
Object[] newColumnValues) {
super(partition, offset, clock);
this.connectorConfig = connectorConfig;
this.unavailableValuePlaceholderBinary = connectorConfig.getUnavailableValuePlaceholder();
this.schema = schema;
this.unavailableValuePlaceholderBinary = ByteBuffer.wrap(connectorConfig.getUnavailableValuePlaceholder());
this.unavailableValuePlaceholderString = new String(connectorConfig.getUnavailableValuePlaceholder());
this.oldColumnValues = oldColumnValues;
this.newColumnValues = newColumnValues;
@ -120,21 +122,14 @@ protected void emitUpdateAsPrimaryKeyChangeRecord(Receiver receiver, TableSchema
* @return list of columns that should be reselected, which can be empty
*/
private List<Column> getReselectColumns(Struct newValue) {
// todo: eventually move this to the relational model to avoid needing to perform this iteration per change event
final List<Column> reselectColumns = new ArrayList<>();
for (Column column : table.columns()) {
switch (column.jdbcType()) {
case OracleTypes.CLOB:
case OracleTypes.NCLOB:
if (newValue.get(column.name()).equals(unavailableValuePlaceholderString)) {
reselectColumns.add(column);
}
break;
case OracleTypes.BLOB:
if (newValue.get(column.name()).equals(ByteBuffer.wrap(unavailableValuePlaceholderBinary))) {
reselectColumns.add(column);
}
break;
for (Column column : schema.getLobColumnsForTable(table.id())) {
final Object value = newValue.get(column.name());
if (OracleDatabaseSchema.isClobColumn(column) && unavailableValuePlaceholderString.equals(value)) {
reselectColumns.add(column);
}
else if (OracleDatabaseSchema.isBlobColumn(column) && unavailableValuePlaceholderBinary.equals(value)) {
reselectColumns.add(column);
}
}
return reselectColumns;

View File

@ -5,11 +5,18 @@
*/
package io.debezium.connector.oracle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
@ -19,6 +26,7 @@
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import oracle.jdbc.OracleTypes;
/**
* The schema of an Oracle database.
@ -30,6 +38,8 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class);
private final OracleDdlParser ddlParser;
private final ConcurrentMap<TableId, List<Column>> lobColumnsByTableId = new ConcurrentHashMap<>();
private boolean storageInitializationExecuted = false;
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,
@ -106,4 +116,63 @@ public boolean isStorageInitializationExecuted() {
public boolean historyExists() {
return databaseHistory.exists();
}
@Override
protected void removeSchema(TableId id) {
super.removeSchema(id);
lobColumnsByTableId.remove(id);
}
@Override
protected void buildAndRegisterSchema(Table table) {
if (getTableFilter().isIncluded(table.id())) {
super.buildAndRegisterSchema(table);
// Cache LOB column mappings for performance
buildAndRegisterTableLobColumns(table);
}
}
/**
* Get a list of large object (LOB) columns for the specified relational table identifier.
*
* @param id the relational table identifier
* @return a list of LOB columns, may be empty if the table has no LOB columns
*/
public List<Column> getLobColumnsForTable(TableId id) {
return lobColumnsByTableId.getOrDefault(id, Collections.emptyList());
}
/**
* Returns whether the provided relational column model is a CLOB or NCLOB data type.
*/
public static boolean isClobColumn(Column column) {
return column.jdbcType() == OracleTypes.CLOB || column.jdbcType() == OracleTypes.NCLOB;
}
/**
* Returns whether the provided relational column model is a CLOB data type.
*/
public static boolean isBlobColumn(Column column) {
return column.jdbcType() == OracleTypes.BLOB;
}
private void buildAndRegisterTableLobColumns(Table table) {
final List<Column> lobColumns = new ArrayList<>();
for (Column column : table.columns()) {
switch (column.jdbcType()) {
case OracleTypes.CLOB:
case OracleTypes.NCLOB:
case OracleTypes.BLOB:
lobColumns.add(column);
break;
}
}
if (!lobColumns.isEmpty()) {
lobColumnsByTableId.put(table.id(), lobColumns);
}
else {
lobColumnsByTableId.remove(table.id());
}
}
}

View File

@ -9,6 +9,7 @@
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
@ -24,14 +25,16 @@ public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter<Object>
private final Operation operation;
public LogMinerChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partition partition, OffsetContext offset,
Operation operation, Object[] oldValues, Object[] newValues, Table table, Clock clock) {
super(connectorConfig, partition, offset, table, clock, oldValues, newValues);
Operation operation, Object[] oldValues, Object[] newValues, Table table,
OracleDatabaseSchema schema, Clock clock) {
super(connectorConfig, partition, offset, schema, table, clock, oldValues, newValues);
this.operation = operation;
}
public LogMinerChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partition partition, OffsetContext offset,
EventType eventType, Object[] oldValues, Object[] newValues, Table table, Clock clock) {
this(connectorConfig, partition, offset, getOperation(eventType), oldValues, newValues, table, clock);
EventType eventType, Object[] oldValues, Object[] newValues, Table table,
OracleDatabaseSchema schema, Clock clock) {
this(connectorConfig, partition, offset, getOperation(eventType), oldValues, newValues, table, schema, clock);
}
private static Operation getOperation(EventType eventType) {

View File

@ -6,13 +6,12 @@
package io.debezium.connector.oracle.logminer.parser;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import oracle.jdbc.OracleTypes;
/**
* A simple DML parser implementation specifically for Oracle LogMiner.
*
@ -672,13 +671,10 @@ private Object getColumnUnavailableValue(Object value, Column column) {
return value;
}
switch (column.jdbcType()) {
case OracleTypes.CLOB:
case OracleTypes.NCLOB:
case OracleTypes.BLOB:
return OracleValueConverters.UNAVAILABLE_VALUE;
default:
return null;
if (OracleDatabaseSchema.isClobColumn(column) || OracleDatabaseSchema.isBlobColumn(column)) {
return OracleValueConverters.UNAVAILABLE_VALUE;
}
return null;
}
}

View File

@ -403,6 +403,7 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
dmlEvent.getDmlEntry().getOldValues(),
dmlEvent.getDmlEntry().getNewValues(),
getSchema().tableFor(event.getTableId()),
getSchema(),
Clock.system());
}
else {
@ -414,6 +415,7 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
dmlEvent.getDmlEntry().getOldValues(),
dmlEvent.getDmlEntry().getNewValues(),
getSchema().tableFor(event.getTableId()),
getSchema(),
Clock.system());
}
dispatcher.dispatchDataChangeEvent(partition, event.getTableId(), logMinerChangeRecordEmitter);

View File

@ -31,7 +31,6 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import oracle.jdbc.OracleTypes;
import oracle.streams.ChunkColumnValue;
import oracle.streams.DDLLCR;
import oracle.streams.DefaultRowLCR;
@ -205,18 +204,13 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues
// is not explicitly provided in the map is initialized with the unavailable value
// marker object so its transformed correctly by the value converters.
// todo: would be useful in the future to track some type of "has-lob" flag on the table
// such a flag would allow us to conditionalize this loop and only apply it to tables
// which have LOB columns.
for (Column column : table.columns()) {
if (isLobColumn(column)) {
// again Xstream doesn't supply before state for LOB values; explicitly use unavailable value
oldChunkValues.put(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
if (!chunkValues.containsKey(column.name())) {
// Column not supplied, initialize with unavailable value marker
LOGGER.trace("\tColumn '{}' not supplied, initialized with unavailable value", column.name());
chunkValues.put(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
}
for (Column column : schema.getLobColumnsForTable(table.id())) {
// again Xstream doesn't supply before state for LOB values; explicitly use unavailable value
oldChunkValues.put(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
if (!chunkValues.containsKey(column.name())) {
// Column not supplied, initialize with unavailable value marker
LOGGER.trace("\tColumn '{}' not supplied, initialized with unavailable value", column.name());
chunkValues.put(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
}
}
@ -230,7 +224,9 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues
lcr,
oldChunkValues,
chunkValues,
schema.tableFor(tableId), clock));
schema.tableFor(tableId),
schema,
clock));
}
private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedException {
@ -357,10 +353,6 @@ public ChunkColumnValue createChunk() throws StreamsException {
throw new UnsupportedOperationException("Should never be called");
}
private boolean isLobColumn(Column column) {
return column.jdbcType() == OracleTypes.CLOB || column.jdbcType() == OracleTypes.NCLOB || column.jdbcType() == OracleTypes.BLOB;
}
private void resolveAndDispatchCurrentChunkedRow() {
try {
// Map of resolved chunk values

View File

@ -9,6 +9,7 @@
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
@ -29,8 +30,8 @@ public class XStreamChangeRecordEmitter extends BaseChangeRecordEmitter<ColumnVa
public XStreamChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partition partition, OffsetContext offset, RowLCR lcr,
Map<String, Object> oldChunkValues, Map<String, Object> newChunkValues,
Table table, Clock clock) {
super(connectorConfig, partition, offset, table, clock, getColumnValues(table, lcr.getOldValues(), oldChunkValues),
Table table, OracleDatabaseSchema schema, Clock clock) {
super(connectorConfig, partition, offset, schema, table, clock, getColumnValues(table, lcr.getOldValues(), oldChunkValues),
getColumnValues(table, lcr.getNewValues(), newChunkValues));
this.lcr = lcr;
}