diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 05c0d61c6..8399d494b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -112,6 +112,7 @@ public abstract class AbstractLogMinerEventProcessor impl private Instant lastProcessedScnChangeTime = null; private Scn lastProcessedScn = Scn.NULL; private boolean sequenceUnavailable = false; + private HashMap objectIdToTableIdCache = new HashMap<>(); private final Set abandonedTransactionsCache = new HashSet<>(); private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache(); @@ -269,6 +270,9 @@ protected boolean isTrxIdRawValue() { public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException { counters.reset(); + // Recreate the cache on each iteration + objectIdToTableIdCache = new HashMap<>(); + try (PreparedStatement statement = createQueryStatement()) { LOGGER.debug("Fetching results for SCN [{}, {}]", startScn, endScn); statement.setFetchSize(getConfig().getQueryFetchSize()); @@ -482,23 +486,9 @@ protected void processRow(OraclePartition partition, LogMinerEventRow row) throw // Special use case where the table has been dropped and purged, and we are processing an // old event for the table that comes prior to the drop. LOGGER.debug("Found DML for dropped table in history with object-id based table name {}.", row.getTableId().table()); - for (TableId tableId : schema.tableIds()) { - LOGGER.debug("Processing table id '{}'", tableId); - Table table = schema.tableFor(tableId); - if (LOGGER.isDebugEnabled()) { - for (Attribute attribute : table.attributes()) { - LOGGER.debug("Attribute {} with value {}", attribute.name(), attribute.value()); - } - } - Attribute attribute = table.attributeWithName("OBJECT_ID"); - if (attribute != null) { - LOGGER.debug("Found table '{}' with object id {}", table.id(), attribute.asLong()); - } - if (attribute != null && attribute.asLong().equals(row.getObjectId())) { - LOGGER.debug("Table lookup resolved to '{}'", table.id()); - row.setTableId(table.id()); - break; - } + final TableId tableId = getTableIdByObjectId(row.getObjectId()); + if (tableId != null) { + row.setTableId(tableId); } } if (!tableFilter.isIncluded(row.getTableId())) { @@ -1971,6 +1961,29 @@ protected int oldestTransactionComparison(T first, T second) { return comparison; } + /** + * Automatically maintains a cache across the mining step of table object id to {@link TableId}. + * This prevents the need to do expensive iteration loops when multiple changes for a given object id + * require the same lookup, this can be optimized using the cache. + * + * @param objectId the table object id to lookup + * @return the matching {@link TableId} if found, or {@code null} if not match found + */ + private TableId getTableIdByObjectId(Long objectId) { + return objectIdToTableIdCache.computeIfAbsent(objectId, (tableObjectId) -> { + for (TableId tableId : schema.tableIds()) { + final Table table = schema.tableFor(tableId); + final Attribute attribute = table.attributeWithName("OBJECT_ID"); + if (attribute != null && attribute.asLong().equals(tableObjectId)) { + LOGGER.debug("Table lookup for object id {} resolved to '{}'", tableObjectId, table.id()); + return table.id(); + } + } + LOGGER.debug("Table lookup for object id {} did not find a match.", tableObjectId); + return null; + }); + } + /** * Wrapper for all counter variables *