DBZ-8071 Improve hybrid mining performance with object-id-to-table-id cache
This commit is contained in:
parent
7c989f1215
commit
f4a2854c85
@ -112,6 +112,7 @@ public abstract class AbstractLogMinerEventProcessor<T extends Transaction> impl
|
|||||||
private Instant lastProcessedScnChangeTime = null;
|
private Instant lastProcessedScnChangeTime = null;
|
||||||
private Scn lastProcessedScn = Scn.NULL;
|
private Scn lastProcessedScn = Scn.NULL;
|
||||||
private boolean sequenceUnavailable = false;
|
private boolean sequenceUnavailable = false;
|
||||||
|
private HashMap<Long, TableId> objectIdToTableIdCache = new HashMap<>();
|
||||||
|
|
||||||
private final Set<String> abandonedTransactionsCache = new HashSet<>();
|
private final Set<String> abandonedTransactionsCache = new HashSet<>();
|
||||||
private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
|
private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
|
||||||
@ -269,6 +270,9 @@ protected boolean isTrxIdRawValue() {
|
|||||||
public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
|
public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
|
||||||
counters.reset();
|
counters.reset();
|
||||||
|
|
||||||
|
// Recreate the cache on each iteration
|
||||||
|
objectIdToTableIdCache = new HashMap<>();
|
||||||
|
|
||||||
try (PreparedStatement statement = createQueryStatement()) {
|
try (PreparedStatement statement = createQueryStatement()) {
|
||||||
LOGGER.debug("Fetching results for SCN [{}, {}]", startScn, endScn);
|
LOGGER.debug("Fetching results for SCN [{}, {}]", startScn, endScn);
|
||||||
statement.setFetchSize(getConfig().getQueryFetchSize());
|
statement.setFetchSize(getConfig().getQueryFetchSize());
|
||||||
@ -482,23 +486,9 @@ protected void processRow(OraclePartition partition, LogMinerEventRow row) throw
|
|||||||
// Special use case where the table has been dropped and purged, and we are processing an
|
// Special use case where the table has been dropped and purged, and we are processing an
|
||||||
// old event for the table that comes prior to the drop.
|
// old event for the table that comes prior to the drop.
|
||||||
LOGGER.debug("Found DML for dropped table in history with object-id based table name {}.", row.getTableId().table());
|
LOGGER.debug("Found DML for dropped table in history with object-id based table name {}.", row.getTableId().table());
|
||||||
for (TableId tableId : schema.tableIds()) {
|
final TableId tableId = getTableIdByObjectId(row.getObjectId());
|
||||||
LOGGER.debug("Processing table id '{}'", tableId);
|
if (tableId != null) {
|
||||||
Table table = schema.tableFor(tableId);
|
row.setTableId(tableId);
|
||||||
if (LOGGER.isDebugEnabled()) {
|
|
||||||
for (Attribute attribute : table.attributes()) {
|
|
||||||
LOGGER.debug("Attribute {} with value {}", attribute.name(), attribute.value());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Attribute attribute = table.attributeWithName("OBJECT_ID");
|
|
||||||
if (attribute != null) {
|
|
||||||
LOGGER.debug("Found table '{}' with object id {}", table.id(), attribute.asLong());
|
|
||||||
}
|
|
||||||
if (attribute != null && attribute.asLong().equals(row.getObjectId())) {
|
|
||||||
LOGGER.debug("Table lookup resolved to '{}'", table.id());
|
|
||||||
row.setTableId(table.id());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!tableFilter.isIncluded(row.getTableId())) {
|
if (!tableFilter.isIncluded(row.getTableId())) {
|
||||||
@ -1971,6 +1961,29 @@ protected int oldestTransactionComparison(T first, T second) {
|
|||||||
return comparison;
|
return comparison;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Automatically maintains a cache across the mining step of table object id to {@link TableId}.
|
||||||
|
* This prevents the need to do expensive iteration loops when multiple changes for a given object id
|
||||||
|
* require the same lookup, this can be optimized using the cache.
|
||||||
|
*
|
||||||
|
* @param objectId the table object id to lookup
|
||||||
|
* @return the matching {@link TableId} if found, or {@code null} if not match found
|
||||||
|
*/
|
||||||
|
private TableId getTableIdByObjectId(Long objectId) {
|
||||||
|
return objectIdToTableIdCache.computeIfAbsent(objectId, (tableObjectId) -> {
|
||||||
|
for (TableId tableId : schema.tableIds()) {
|
||||||
|
final Table table = schema.tableFor(tableId);
|
||||||
|
final Attribute attribute = table.attributeWithName("OBJECT_ID");
|
||||||
|
if (attribute != null && attribute.asLong().equals(tableObjectId)) {
|
||||||
|
LOGGER.debug("Table lookup for object id {} resolved to '{}'", tableObjectId, table.id());
|
||||||
|
return table.id();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOGGER.debug("Table lookup for object id {} did not find a match.", tableObjectId);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapper for all counter variables
|
* Wrapper for all counter variables
|
||||||
*
|
*
|
||||||
|
Loading…
Reference in New Issue
Block a user