diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index b4cf26c13..a411f0625 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -685,8 +685,8 @@ protected Map> getAttributeDetails(TableId tableId, Str getDatabaseObjectDetails(tableId, tableType, (objectId, dataObjectId) -> { LOGGER.info("\tRegistering '{}' attributes: object_id={}, data_object_id={}", tableId, objectId, dataObjectId); final List attributes = new ArrayList<>(); - attributes.add(Attribute.editor().name("OBJECT_ID").value(objectId).create()); - attributes.add(Attribute.editor().name("DATA_OBJECT_ID").value(dataObjectId).create()); + attributes.add(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_OBJECT_ID).value(objectId).create()); + attributes.add(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_DATA_OBJECT_ID).value(dataObjectId).create()); results.put(tableId, attributes); }); return results; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 01e8d405b..58a4d15a0 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -649,6 +649,16 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withDescription("(Deprecated) if true, CONTINUOUS_MINE option will be added to the log mining session. " + "This will manage log files switches seamlessly."); + public static final Field OBJECT_ID_CACHE_SIZE = Field.createInternal("object.id.cache.size") + .withDisplayName("Controls the maximum size of the object ID cache") + .withType(Type.INT) + .withWidth(Width.SHORT) + .withDefault(10) + .withImportance(Importance.LOW) + .withValidation(OracleConnectorConfig::validateObjectIdCacheSize) + .withDescription("The connector maintains a least-recently used cache of database table object ID to name mappings. " + + "This controls the maximum capacity of this cache."); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("Oracle") .excluding( @@ -724,7 +734,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, - LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG) + LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, + OBJECT_ID_CACHE_SIZE) .events(SOURCE_INFO_STRUCT_MAKER) .create(); @@ -755,6 +766,7 @@ public static ConfigDef configDef() { private final SnapshotLockingMode snapshotLockingMode; private final int queryFetchSize; private final int snapshotRetryDatabaseErrorsMaxRetries; + private final int objectIdToTableIdCacheSize; // LogMiner options private final LogMiningStrategy logMiningStrategy; @@ -818,6 +830,7 @@ public OracleConnectorConfig(Configuration config) { this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); this.lobEnabled = config.getBoolean(LOB_ENABLED); + this.objectIdToTableIdCacheSize = config.getInteger(OBJECT_ID_CACHE_SIZE); this.streamingAdapter = this.connectorAdapter.getInstance(this); if (this.streamingAdapter == null) { @@ -1955,6 +1968,15 @@ public Configuration getLogMiningEhcacheConfiguration() { return logMiningEhCacheConfiguration; } + /** + * Return the object id to table id cache size + * + * @return the maximum size of the object id to table id cache + */ + public int getObjectIdToTableIdCacheSize() { + return objectIdToTableIdCacheSize; + } + @Override public String getConnectorName() { return Module.name(); @@ -2153,4 +2175,12 @@ public static int validateEhcacheConfigFieldRequired(Configuration config, Field } return 0; } + + public static int validateObjectIdCacheSize(Configuration config, Field field, ValidationOutput problems) { + int result = Field.isRequired(config, field, problems); + if (result != 0) { + return result; + } + return Field.isPositiveInteger(config, field, problems); + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java index 0f9977eff..13d25285f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -17,6 +18,7 @@ import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity; import io.debezium.connector.oracle.antlr.OracleDdlParser; +import io.debezium.relational.Attribute; import io.debezium.relational.Column; import io.debezium.relational.DefaultValueConverter; import io.debezium.relational.HistorizedRelationalDatabaseSchema; @@ -27,6 +29,7 @@ import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaNameAdjuster; import io.debezium.spi.topic.TopicNamingStrategy; +import io.debezium.util.LRUCacheMap; import oracle.jdbc.OracleTypes; @@ -39,11 +42,13 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema { private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class); + public static final String ATTRIBUTE_OBJECT_ID = "OBJECT_ID"; + public static final String ATTRIBUTE_DATA_OBJECT_ID = "DATA_OBJECT_ID"; + private final OracleDdlParser ddlParser; private final ConcurrentMap> lobColumnsByTableId = new ConcurrentHashMap<>(); private final OracleValueConverters valueConverters; - - private boolean storageInitializationExecuted = false; + private final LRUCacheMap objectIdToTableId; public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters, DefaultValueConverter defaultValueConverter, SchemaNameAdjuster schemaNameAdjuster, @@ -68,6 +73,8 @@ public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueCo connectorConfig.isSchemaCommentsHistoryEnabled(), valueConverters, connectorConfig.getTableFilters().dataCollectionFilter()); + + this.objectIdToTableId = new LRUCacheMap<>(connectorConfig.getObjectIdToTableIdCacheSize()); } public Tables getTables() { @@ -121,9 +128,45 @@ protected void buildAndRegisterSchema(Table table) { // Cache LOB column mappings for performance buildAndRegisterTableLobColumns(table); + + // Cache Object ID to Table ID for performance + buildAndRegisterTableObjectIdReferences(table); } } + /** + * Get the {@link TableId} by {@code objectId}. + * + * @param objectId the object id find the table id about, cannot be {@code null} + * @param dataObjectId the data object id, may be {@code null} + * @return the table identifier or {@code null} if no entry is cached or in the schema with the object id + */ + public TableId getTableIdByObjectId(Long objectId, Long dataObjectId) { + Objects.requireNonNull(objectId, "The database table object id is null and is not allowed"); + // Internally we cache this using a bounded cache for performance reasons, particularly when a + // transaction may refer to the same table for consecutive DML events. This avoids the need to + // iterate the list of tables on each DML event observed. + return objectIdToTableId.computeIfAbsent(objectId, (tableObjectId) -> { + for (TableId tableId : tableIds()) { + final Table table = tableFor(tableId); + final Attribute attribute = table.attributeWithName(ATTRIBUTE_OBJECT_ID); + if (attribute != null && attribute.asLong().equals(tableObjectId)) { + if (dataObjectId != null) { + final Attribute dataAttribute = table.attributeWithName(ATTRIBUTE_DATA_OBJECT_ID); + if (dataAttribute == null || !dataAttribute.asLong().equals(dataObjectId)) { + // Did not match, continue + continue; + } + } + LOGGER.debug("Table lookup for object {} resolved to '{}'", tableObjectId, table.id()); + return table.id(); + } + } + LOGGER.debug("Table lookup for object id {} did not find a match.", tableObjectId); + return null; + }); + } + /** * Get a list of large object (LOB) columns for the specified relational table identifier. * @@ -175,6 +218,13 @@ private static boolean isBlobColumn(Column column) { return column.jdbcType() == OracleTypes.BLOB; } + private void buildAndRegisterTableObjectIdReferences(Table table) { + final Attribute attribute = table.attributeWithName(ATTRIBUTE_OBJECT_ID); + if (attribute != null) { + objectIdToTableId.put(attribute.asLong(), table.id()); + } + } + private void buildAndRegisterTableLobColumns(Table table) { final List lobColumns = new ArrayList<>(); for (Column column : table.columns()) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java index 632580024..40e46f550 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java @@ -225,8 +225,8 @@ void applyTableObjectAttributes(TableId tableId) { if (table != null) { // If the table has not yet been registered, there is nothing to apply final TableEditor editor = table.edit(); - editor.addAttribute(Attribute.editor().name("OBJECT_ID").value(objectId).create()); - editor.addAttribute(Attribute.editor().name("DATA_OBJECT_ID").value(dataObjectId).create()); + editor.addAttribute(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_OBJECT_ID).value(objectId).create()); + editor.addAttribute(Attribute.editor().name(OracleDatabaseSchema.ATTRIBUTE_DATA_OBJECT_ID).value(dataObjectId).create()); schema.getTables().overwriteTable(editor.create()); } else { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 8399d494b..34fc8a735 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -66,7 +66,6 @@ import io.debezium.data.Envelope; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; -import io.debezium.relational.Attribute; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; @@ -112,7 +111,6 @@ public abstract class AbstractLogMinerEventProcessor impl private Instant lastProcessedScnChangeTime = null; private Scn lastProcessedScn = Scn.NULL; private boolean sequenceUnavailable = false; - private HashMap objectIdToTableIdCache = new HashMap<>(); private final Set abandonedTransactionsCache = new HashSet<>(); private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache(); @@ -270,9 +268,6 @@ protected boolean isTrxIdRawValue() { public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException { counters.reset(); - // Recreate the cache on each iteration - objectIdToTableIdCache = new HashMap<>(); - try (PreparedStatement statement = createQueryStatement()) { LOGGER.debug("Fetching results for SCN [{}, {}]", startScn, endScn); statement.setFetchSize(getConfig().getQueryFetchSize()); @@ -486,7 +481,7 @@ protected void processRow(OraclePartition partition, LogMinerEventRow row) throw // Special use case where the table has been dropped and purged, and we are processing an // old event for the table that comes prior to the drop. LOGGER.debug("Found DML for dropped table in history with object-id based table name {}.", row.getTableId().table()); - final TableId tableId = getTableIdByObjectId(row.getObjectId()); + final TableId tableId = schema.getTableIdByObjectId(row.getObjectId(), null); if (tableId != null) { row.setTableId(tableId); } @@ -1406,15 +1401,9 @@ private TableId getTableIdForDataEvent(LogMinerEventRow row) throws SQLException } else if (tableId.table().equalsIgnoreCase("UNKNOWN")) { // Object has been dropped and purged. - for (TableId schemaTableId : schema.tableIds()) { - final Table table = schema.tableFor(schemaTableId); - final Attribute objectId = table.attributeWithName("OBJECT_ID"); - final Attribute dataObjectId = table.attributeWithName("DATA_OBJECT_ID"); - if (objectId != null && dataObjectId != null) { - if (row.getObjectId() == objectId.asLong() && row.getDataObjectId() == dataObjectId.asLong()) { - return table.id(); - } - } + final TableId resolvedTableId = schema.getTableIdByObjectId(row.getObjectId(), row.getDataObjectId()); + if (resolvedTableId != null) { + return resolvedTableId; } throw new DebeziumException("Failed to resolve UNKNOWN table name by object id lookup"); } @@ -1961,29 +1950,6 @@ protected int oldestTransactionComparison(T first, T second) { return comparison; } - /** - * Automatically maintains a cache across the mining step of table object id to {@link TableId}. - * This prevents the need to do expensive iteration loops when multiple changes for a given object id - * require the same lookup, this can be optimized using the cache. - * - * @param objectId the table object id to lookup - * @return the matching {@link TableId} if found, or {@code null} if not match found - */ - private TableId getTableIdByObjectId(Long objectId) { - return objectIdToTableIdCache.computeIfAbsent(objectId, (tableObjectId) -> { - for (TableId tableId : schema.tableIds()) { - final Table table = schema.tableFor(tableId); - final Attribute attribute = table.attributeWithName("OBJECT_ID"); - if (attribute != null && attribute.asLong().equals(tableObjectId)) { - LOGGER.debug("Table lookup for object id {} resolved to '{}'", tableObjectId, table.id()); - return table.id(); - } - } - LOGGER.debug("Table lookup for object id {} did not find a match.", tableObjectId); - return null; - }); - } - /** * Wrapper for all counter variables * diff --git a/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java b/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java index c7515ab44..730b97200 100644 --- a/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java +++ b/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java @@ -9,6 +9,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.kafka.common.cache.Cache; @@ -66,4 +67,8 @@ public Collection values() { public String toString() { return delegate.toString(); } + + public V computeIfAbsent(K key, Function mappingFunction) { + return delegate.computeIfAbsent(key, mappingFunction); + } }