DBZ-8071 Cap cache size by internal config, moved to schema class, small code clean-up

This commit is contained in:
Chris Cranford 2024-09-03 06:15:49 -04:00 committed by Chris Cranford
parent f4a2854c85
commit 2c0eea6b25
6 changed files with 96 additions and 45 deletions

View File

@ -685,8 +685,8 @@ protected Map<TableId, List<Attribute>> getAttributeDetails(TableId tableId, Str
getDatabaseObjectDetails(tableId, tableType, (objectId, dataObjectId) -> {
LOGGER.info("\tRegistering '{}' attributes: object_id={}, data_object_id={}", tableId, objectId, dataObjectId);
final List<Attribute> attributes = new ArrayList<>();
attributes.add(Attribute.editor().name("OBJECT_ID").value(objectId).create());
attributes.add(Attribute.editor().name("DATA_OBJECT_ID").value(dataObjectId).create());
attributes.add(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_OBJECT_ID).value(objectId).create());
attributes.add(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_DATA_OBJECT_ID).value(dataObjectId).create());
results.put(tableId, attributes);
});
return results;

View File

@ -649,6 +649,16 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription("(Deprecated) if true, CONTINUOUS_MINE option will be added to the log mining session. " +
"This will manage log files switches seamlessly.");
public static final Field OBJECT_ID_CACHE_SIZE = Field.createInternal("object.id.cache.size")
.withDisplayName("Controls the maximum size of the object ID cache")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withDefault(10)
.withImportance(Importance.LOW)
.withValidation(OracleConnectorConfig::validateObjectIdCacheSize)
.withDescription("The connector maintains a least-recently used cache of database table object ID to name mappings. "
+ "This controls the maximum capacity of this cache.");
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("Oracle")
.excluding(
@ -724,7 +734,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG,
LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG,
LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG,
LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG)
LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG,
OBJECT_ID_CACHE_SIZE)
.events(SOURCE_INFO_STRUCT_MAKER)
.create();
@ -755,6 +766,7 @@ public static ConfigDef configDef() {
private final SnapshotLockingMode snapshotLockingMode;
private final int queryFetchSize;
private final int snapshotRetryDatabaseErrorsMaxRetries;
private final int objectIdToTableIdCacheSize;
// LogMiner options
private final LogMiningStrategy logMiningStrategy;
@ -818,6 +830,7 @@ public OracleConnectorConfig(Configuration config) {
this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER));
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.lobEnabled = config.getBoolean(LOB_ENABLED);
this.objectIdToTableIdCacheSize = config.getInteger(OBJECT_ID_CACHE_SIZE);
this.streamingAdapter = this.connectorAdapter.getInstance(this);
if (this.streamingAdapter == null) {
@ -1955,6 +1968,15 @@ public Configuration getLogMiningEhcacheConfiguration() {
return logMiningEhCacheConfiguration;
}
/**
* Return the object id to table id cache size
*
* @return the maximum size of the object id to table id cache
*/
public int getObjectIdToTableIdCacheSize() {
return objectIdToTableIdCacheSize;
}
@Override
public String getConnectorName() {
return Module.name();
@ -2153,4 +2175,12 @@ public static int validateEhcacheConfigFieldRequired(Configuration config, Field
}
return 0;
}
public static int validateObjectIdCacheSize(Configuration config, Field field, ValidationOutput problems) {
int result = Field.isRequired(config, field, problems);
if (result != 0) {
return result;
}
return Field.isPositiveInteger(config, field, problems);
}
}

View File

@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -17,6 +18,7 @@
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.relational.Attribute;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
@ -27,6 +29,7 @@
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.LRUCacheMap;
import oracle.jdbc.OracleTypes;
@ -39,11 +42,13 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class);
public static final String ATTRIBUTE_OBJECT_ID = "OBJECT_ID";
public static final String ATTRIBUTE_DATA_OBJECT_ID = "DATA_OBJECT_ID";
private final OracleDdlParser ddlParser;
private final ConcurrentMap<TableId, List<Column>> lobColumnsByTableId = new ConcurrentHashMap<>();
private final OracleValueConverters valueConverters;
private boolean storageInitializationExecuted = false;
private final LRUCacheMap<Long, TableId> objectIdToTableId;
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,
DefaultValueConverter defaultValueConverter, SchemaNameAdjuster schemaNameAdjuster,
@ -68,6 +73,8 @@ public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueCo
connectorConfig.isSchemaCommentsHistoryEnabled(),
valueConverters,
connectorConfig.getTableFilters().dataCollectionFilter());
this.objectIdToTableId = new LRUCacheMap<>(connectorConfig.getObjectIdToTableIdCacheSize());
}
public Tables getTables() {
@ -121,9 +128,45 @@ protected void buildAndRegisterSchema(Table table) {
// Cache LOB column mappings for performance
buildAndRegisterTableLobColumns(table);
// Cache Object ID to Table ID for performance
buildAndRegisterTableObjectIdReferences(table);
}
}
/**
* Get the {@link TableId} by {@code objectId}.
*
* @param objectId the object id find the table id about, cannot be {@code null}
* @param dataObjectId the data object id, may be {@code null}
* @return the table identifier or {@code null} if no entry is cached or in the schema with the object id
*/
public TableId getTableIdByObjectId(Long objectId, Long dataObjectId) {
Objects.requireNonNull(objectId, "The database table object id is null and is not allowed");
// Internally we cache this using a bounded cache for performance reasons, particularly when a
// transaction may refer to the same table for consecutive DML events. This avoids the need to
// iterate the list of tables on each DML event observed.
return objectIdToTableId.computeIfAbsent(objectId, (tableObjectId) -> {
for (TableId tableId : tableIds()) {
final Table table = tableFor(tableId);
final Attribute attribute = table.attributeWithName(ATTRIBUTE_OBJECT_ID);
if (attribute != null && attribute.asLong().equals(tableObjectId)) {
if (dataObjectId != null) {
final Attribute dataAttribute = table.attributeWithName(ATTRIBUTE_DATA_OBJECT_ID);
if (dataAttribute == null || !dataAttribute.asLong().equals(dataObjectId)) {
// Did not match, continue
continue;
}
}
LOGGER.debug("Table lookup for object {} resolved to '{}'", tableObjectId, table.id());
return table.id();
}
}
LOGGER.debug("Table lookup for object id {} did not find a match.", tableObjectId);
return null;
});
}
/**
* Get a list of large object (LOB) columns for the specified relational table identifier.
*
@ -175,6 +218,13 @@ private static boolean isBlobColumn(Column column) {
return column.jdbcType() == OracleTypes.BLOB;
}
private void buildAndRegisterTableObjectIdReferences(Table table) {
final Attribute attribute = table.attributeWithName(ATTRIBUTE_OBJECT_ID);
if (attribute != null) {
objectIdToTableId.put(attribute.asLong(), table.id());
}
}
private void buildAndRegisterTableLobColumns(Table table) {
final List<Column> lobColumns = new ArrayList<>();
for (Column column : table.columns()) {

View File

@ -225,8 +225,8 @@ void applyTableObjectAttributes(TableId tableId) {
if (table != null) {
// If the table has not yet been registered, there is nothing to apply
final TableEditor editor = table.edit();
editor.addAttribute(Attribute.editor().name("OBJECT_ID").value(objectId).create());
editor.addAttribute(Attribute.editor().name("DATA_OBJECT_ID").value(dataObjectId).create());
editor.addAttribute(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_OBJECT_ID).value(objectId).create());
editor.addAttribute(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_DATA_OBJECT_ID).value(dataObjectId).create());
schema.getTables().overwriteTable(editor.create());
}
else {

View File

@ -66,7 +66,6 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.Attribute;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
@ -112,7 +111,6 @@ public abstract class AbstractLogMinerEventProcessor<T extends Transaction> impl
private Instant lastProcessedScnChangeTime = null;
private Scn lastProcessedScn = Scn.NULL;
private boolean sequenceUnavailable = false;
private HashMap<Long, TableId> objectIdToTableIdCache = new HashMap<>();
private final Set<String> abandonedTransactionsCache = new HashSet<>();
private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
@ -270,9 +268,6 @@ protected boolean isTrxIdRawValue() {
public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
counters.reset();
// Recreate the cache on each iteration
objectIdToTableIdCache = new HashMap<>();
try (PreparedStatement statement = createQueryStatement()) {
LOGGER.debug("Fetching results for SCN [{}, {}]", startScn, endScn);
statement.setFetchSize(getConfig().getQueryFetchSize());
@ -486,7 +481,7 @@ protected void processRow(OraclePartition partition, LogMinerEventRow row) throw
// Special use case where the table has been dropped and purged, and we are processing an
// old event for the table that comes prior to the drop.
LOGGER.debug("Found DML for dropped table in history with object-id based table name {}.", row.getTableId().table());
final TableId tableId = getTableIdByObjectId(row.getObjectId());
final TableId tableId = schema.getTableIdByObjectId(row.getObjectId(), null);
if (tableId != null) {
row.setTableId(tableId);
}
@ -1406,15 +1401,9 @@ private TableId getTableIdForDataEvent(LogMinerEventRow row) throws SQLException
}
else if (tableId.table().equalsIgnoreCase("UNKNOWN")) {
// Object has been dropped and purged.
for (TableId schemaTableId : schema.tableIds()) {
final Table table = schema.tableFor(schemaTableId);
final Attribute objectId = table.attributeWithName("OBJECT_ID");
final Attribute dataObjectId = table.attributeWithName("DATA_OBJECT_ID");
if (objectId != null && dataObjectId != null) {
if (row.getObjectId() == objectId.asLong() && row.getDataObjectId() == dataObjectId.asLong()) {
return table.id();
}
}
final TableId resolvedTableId = schema.getTableIdByObjectId(row.getObjectId(), row.getDataObjectId());
if (resolvedTableId != null) {
return resolvedTableId;
}
throw new DebeziumException("Failed to resolve UNKNOWN table name by object id lookup");
}
@ -1961,29 +1950,6 @@ protected int oldestTransactionComparison(T first, T second) {
return comparison;
}
/**
* Automatically maintains a cache across the mining step of table object id to {@link TableId}.
* This prevents the need to do expensive iteration loops when multiple changes for a given object id
* require the same lookup, this can be optimized using the cache.
*
* @param objectId the table object id to lookup
* @return the matching {@link TableId} if found, or {@code null} if not match found
*/
private TableId getTableIdByObjectId(Long objectId) {
return objectIdToTableIdCache.computeIfAbsent(objectId, (tableObjectId) -> {
for (TableId tableId : schema.tableIds()) {
final Table table = schema.tableFor(tableId);
final Attribute attribute = table.attributeWithName("OBJECT_ID");
if (attribute != null && attribute.asLong().equals(tableObjectId)) {
LOGGER.debug("Table lookup for object id {} resolved to '{}'", tableObjectId, table.id());
return table.id();
}
}
LOGGER.debug("Table lookup for object id {} did not find a match.", tableObjectId);
return null;
});
}
/**
* Wrapper for all counter variables
*

View File

@ -9,6 +9,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.common.cache.Cache;
@ -66,4 +67,8 @@ public Collection<V> values() {
public String toString() {
return delegate.toString();
}
public V computeIfAbsent(K key, Function<K, V> mappingFunction) {
return delegate.computeIfAbsent(key, mappingFunction);
}
}