DBZ-8054 Refactor removeEventWithRowId to reduce scans of events

This commit is contained in:
Jeremy Ford 2024-07-26 21:06:08 -05:00 committed by Chris Cranford
parent 1be6121416
commit 1315842f1f
5 changed files with 57 additions and 95 deletions

View File

@ -13,9 +13,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -120,60 +118,43 @@ protected String getFirstActiveTransactionKey() {
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
List<String> eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-");
// locate the events based solely on XIDUSN and XIDSLT.
String basePrefix = getTransactionIdPrefix(row.getTransactionId());
List<String> eventKeysForBasePrefix = getTransactionKeysWithPrefix(basePrefix);
String transactionIdPrefix = row.getTransactionId() + "-";
// filter the existing list down to the events for the transaction
List<String> eventKeys = eventKeysForBasePrefix.stream()
.filter(k -> k.startsWith(transactionIdPrefix))
.toList();
if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT.
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
// resolved.
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
eventKeys = getTransactionKeysWithPrefix(transactionPrefix);
if (!eventKeys.isEmpty()) {
// Enforce that the keys are always reverse sorted.
eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
LOGGER.debug("Checking all transactions with prefix '{}'", basePrefix);
eventKeys = eventKeysForBasePrefix;
}
else {
// Enforce that the keys are always reverse sorted.
eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.debug("Undo applied for event {}.", event);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
if (!eventKeys.isEmpty()) {
removeEvents(row, eventKeys);
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
}
protected List<String> getTransactionKeysWithPrefix(String prefix) {
AtomicReference<List<String>> result = new AtomicReference<>();
getEventCache().keys(stream -> {
result.set(stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList()));
});
return result.get();
// Enforce that the keys are always reverse sorted.
return getEventCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey)
.filter(k -> k.startsWith(prefix))
.sorted(EventKeySortComparator.INSTANCE.reversed())
.toList());
}
@Override
@ -234,7 +215,7 @@ protected void resetTransactionToStart(T transaction) {
@Override
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null) {
if (row.getTableName() != null && getConfig().isLobEnabled()) {
getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
}
@ -302,7 +283,7 @@ private void removeEventsWithTransaction(T transaction) {
*/
private static class EventKeySortComparator implements Comparator<String> {
public static EventKeySortComparator INSTANCE = new EventKeySortComparator();
public static final EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
@ -432,4 +413,18 @@ public LogMinerEvent next() {
}
};
}
private void removeEvents(LogMinerEventRow row, List<String> eventKeys) {
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
}

View File

@ -17,21 +17,19 @@ public class InMemoryPendingTransactionsCache {
*/
private final Map<String, Integer> pendingTransactionInEventsCache = new HashMap<>();
public Integer getNumPending(String transactionId) {
public int getNumPending(String transactionId) {
return pendingTransactionInEventsCache.getOrDefault(transactionId, 0);
}
public String putOrIncrement(String transactionId) {
final Integer i = pendingTransactionInEventsCache.getOrDefault(transactionId, 0);
pendingTransactionInEventsCache.put(transactionId, i + 1);
return transactionId;
public void putOrIncrement(String transactionId) {
pendingTransactionInEventsCache.compute(transactionId, (k, value) -> {
value = value == null ? 0 : value;
return value + 1;
});
}
public void decrement(String transactionId) {
final int i = pendingTransactionInEventsCache.getOrDefault(transactionId, 0);
if (i > 0) {
pendingTransactionInEventsCache.put(transactionId, i - 1);
}
pendingTransactionInEventsCache.compute(transactionId, (k, value) -> value == null || value == 0 ? 0 : value - 1);
}
public void initKey(String transactionId, int count) {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -18,8 +19,8 @@ public class MemoryBasedLogMinerCache<K, V> implements LogMinerCache<K, V> {
private final Map<K, V> map;
public MemoryBasedLogMinerCache(Map<K, V> map) {
this.map = map;
public MemoryBasedLogMinerCache() {
this.map = new HashMap<>();
}
@Override

View File

@ -5,8 +5,6 @@
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.util.HashMap;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
@ -30,13 +28,10 @@
*/
public class MemoryLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<MemoryTransaction> {
/**
* Cache of transactions, keyed based on the transaction's unique identifier
*/
private final LogMinerCache<String, MemoryTransaction> transactionCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, LogMinerEvent> eventCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, String> schemaCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, String> processedTransactionsCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, MemoryTransaction> transactionCache = new MemoryBasedLogMinerCache<>();
private final LogMinerCache<String, LogMinerEvent> eventCache = new MemoryBasedLogMinerCache<>();
private final LogMinerCache<String, String> schemaCache = new MemoryBasedLogMinerCache<>();
private final LogMinerCache<String, String> processedTransactionsCache = new MemoryBasedLogMinerCache<>();
public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,

View File

@ -6,14 +6,8 @@
package io.debezium.connector.oracle.logminer.processor.memory;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.AbstractTransaction;
/**
@ -23,14 +17,10 @@
*/
public class MemoryTransaction extends AbstractTransaction {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTransaction.class);
private int numberOfEvents;
private List<LogMinerEvent> events;
public MemoryTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, Integer redoThreadId) {
super(transactionId, startScn, changeTime, userName, redoThreadId);
this.events = new ArrayList<>();
start();
}
@ -49,23 +39,6 @@ public void start() {
numberOfEvents = 0;
}
public List<LogMinerEvent> getEvents() {
return events;
}
public boolean removeEventWithRowId(String rowId) {
// Should always iterate from the back of the event queue and remove the last that matches row-id.
for (int i = events.size() - 1; i >= 0; i--) {
final LogMinerEvent event = events.get(i);
if (event.getRowId().equals(rowId)) {
events.remove(i);
LOGGER.trace("Undo applied for event {}.", event);
return true;
}
}
return false;
}
@Override
public String toString() {
return "MemoryTransaction{" +