Refactor caching into common base class
Provide new LogMinerCache interface for swapping out alternative caches
This commit is contained in:
Jeremy Ford 2024-06-18 19:26:09 -04:00 committed by Chris Cranford
parent fb94614419
commit ff828a340e
13 changed files with 903 additions and 697 deletions

View File

@ -23,9 +23,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -81,7 +79,7 @@
*
* @author Chris Cranford
*/
public abstract class AbstractLogMinerEventProcessor<T extends AbstractTransaction> implements LogMinerEventProcessor {
public abstract class AbstractLogMinerEventProcessor<T extends Transaction> implements LogMinerEventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
@ -113,14 +111,14 @@ public abstract class AbstractLogMinerEventProcessor<T extends AbstractTransacti
private final Set<String> abandonedTransactionsCache = new HashSet<>();
public AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleDatabaseSchema schema,
OraclePartition partition,
OracleOffsetContext offsetContext,
EventDispatcher<OraclePartition, TableId> dispatcher,
LogMinerStreamingChangeEventSourceMetrics metrics,
OracleConnection jdbcConnection) {
protected AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleDatabaseSchema schema,
OraclePartition partition,
OracleOffsetContext offsetContext,
EventDispatcher<OraclePartition, TableId> dispatcher,
LogMinerStreamingChangeEventSourceMetrics metrics,
OracleConnection jdbcConnection) {
this.context = context;
this.connectorConfig = connectorConfig;
this.schema = schema;
@ -192,7 +190,7 @@ protected Instant getLastProcessedScnChangeTime() {
* Returns the {@code TransactionCache} implementation.
* @return the transaction cache, never {@code null}
*/
protected abstract Map<String, T> getTransactionCache();
protected abstract LogMinerCache<String, T> getTransactionCache();
/**
* Creates a new transaction based on the supplied {@code START} event.
@ -260,11 +258,11 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
if (metrics.getNumberOfActiveTransactions() > 0 && LOGGER.isDebugEnabled()) {
// This is wrapped in try-with-resources specifically for Infinispan performance
try (Stream<T> stream = getTransactionCache().values().stream()) {
getTransactionCache().values(values -> {
LOGGER.debug("All active transactions: {}",
stream.map(t -> t.getTransactionId() + " (" + t.getStartScn() + ")")
values.map(t -> t.getTransactionId() + " (" + t.getStartScn() + ")")
.collect(Collectors.joining(",")));
}
});
}
metrics.setLastProcessedRowsCount(counters.rows);
@ -1476,7 +1474,7 @@ private ParsedLobWriteSql parseLobWriteSql(String sql) {
return new ParsedLobWriteSql(offset, length, data);
}
private class ParsedLobWriteSql {
private static class ParsedLobWriteSql {
final int offset;
final int length;
final String data;
@ -1556,44 +1554,38 @@ public void abandonTransactions(Duration retention) throws InterruptedException
Scn thresholdScn = lastScnToAbandonTransactions.get();
Scn smallestScn = getTransactionCacheMinimumScn();
if (!smallestScn.isNull() && thresholdScn.compareTo(smallestScn) >= 0) {
LogMinerCache<String, T> transactionCache = getTransactionCache();
Map<String, T> abandonedT = transactionCache.streamAndReturn(stream -> stream
.filter(e -> e.getValue().getStartScn().compareTo(thresholdScn) <= 0)
.collect(Collectors.toMap(LogMinerCache.Entry::getKey, LogMinerCache.Entry::getValue)));
boolean first = true;
Iterator<Map.Entry<String, T>> iterator = getTransactionCache().entrySet().iterator();
try {
while (iterator.hasNext()) {
Map.Entry<String, T> entry = iterator.next();
if (entry.getValue().getStartScn().compareTo(thresholdScn) <= 0) {
if (first) {
LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn);
if (LOGGER.isDebugEnabled()) {
try (Stream<String> s = getTransactionCache().keySet().stream()) {
LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]",
s.collect(Collectors.joining(",")));
}
}
first = false;
}
LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events) is being abandoned.",
entry.getKey(), entry.getValue().getStartScn(), entry.getValue().getChangeTime(),
entry.getValue().getRedoThreadId(), entry.getValue().getNumberOfEvents());
cleanupAfterTransactionRemovedFromCache(entry.getValue(), true);
iterator.remove();
metrics.addAbandonedTransactionId(entry.getKey());
metrics.setActiveTransactionCount(getTransactionCache().size());
}
}
}
finally {
if (iterator instanceof CloseableIterator) {
((CloseableIterator<Map.Entry<String, T>>) iterator).close();
for (Map.Entry<String, T> entry : abandonedT.entrySet()) {
if (first) {
LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn);
first = false;
}
String key = entry.getKey();
T value = entry.getValue();
LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events) is being abandoned.",
key, value.getStartScn(), value.getChangeTime(),
value.getRedoThreadId(), value.getNumberOfEvents());
cleanupAfterTransactionRemovedFromCache(value, true);
transactionCache.remove(key);
metrics.addAbandonedTransactionId(key);
metrics.setActiveTransactionCount(transactionCache.size());
}
if (LOGGER.isDebugEnabled()) {
try (Stream<String> s = getTransactionCache().keySet().stream()) {
LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]",
s.collect(Collectors.joining(",")));
}
LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]",
String.join(",", abandonedT.keySet()));
transactionCache.keys(keys -> LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]",
keys.collect(Collectors.joining(","))));
}
// Update the oldest scn metric are transaction abandonment
@ -1661,23 +1653,27 @@ private Scn getLastScnToAbandonFallbackByTransactionChangeTime(Duration retentio
LOGGER.debug("Getting abandon SCN breakpoint based on change time {} (retention {} minutes).",
getLastProcessedScnChangeTime(), retention.toMinutes());
Scn calculatedLastScn = Scn.NULL;
for (Transaction transaction : getTransactionCache().values()) {
final Instant changeTime = transaction.getChangeTime();
final long diffMinutes = Duration.between(getLastProcessedScnChangeTime(), changeTime).abs().toMinutes();
if (diffMinutes > 0 && diffMinutes > retention.toMinutes()) {
// We either now will capture the transaction's SCN because it is the first detected transaction
// outside the configured retention period or the transaction has a start SCN that is more recent
// than the current calculated SCN but is still outside the configured retention period.
LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.",
transaction.getTransactionId(), transaction.getStartScn(), changeTime, diffMinutes);
if (calculatedLastScn.isNull() || calculatedLastScn.compareTo(transaction.getStartScn()) < 0) {
calculatedLastScn = transaction.getStartScn();
}
}
}
return getTransactionCache().streamAndReturn(stream -> {
return stream.map(LogMinerCache.Entry::getValue)
.filter(t -> {
final Instant changeTime = t.getChangeTime();
final long diffMinutes = Duration.between(getLastProcessedScnChangeTime(), changeTime).abs().toMinutes();
return calculatedLastScn;
// We either now will capture the transaction's SCN because it is the first detected transaction
// outside the configured retention period or the transaction has a start SCN that is more recent
// than the current calculated SCN but is still outside the configured retention period.
LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.",
t.getTransactionId(), t.getStartScn(), changeTime, diffMinutes);
return diffMinutes > 0 && diffMinutes > retention.toMinutes();
})
.max(this::compareStartScn)
.map(Transaction::getStartScn)
.orElse(Scn.NULL);
});
}
protected int compareStartScn(T first, T second) {
return first.getStartScn().compareTo(second.getStartScn());
}
/**

View File

@ -0,0 +1,446 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
import io.debezium.util.Loggings;
/**
* An implementation of {@link LogMinerEventProcessor}
* that uses Infinispan to persist the transaction cache across restarts on disk.
*
* @author Chris Cranford
*/
public abstract class AbstractTransactionCachingLogMinerEventProcessor<T extends Transaction> extends AbstractLogMinerEventProcessor<T> implements CacheProvider<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTransactionCachingLogMinerEventProcessor.class);
private final OracleConnection jdbcConnection;
private final LogMinerStreamingChangeEventSourceMetrics metrics;
private final OraclePartition partition;
private final OracleOffsetContext offsetContext;
private final EventDispatcher<OraclePartition, TableId> dispatcher;
private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
protected AbstractTransactionCachingLogMinerEventProcessor(
ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<OraclePartition, TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
this.jdbcConnection = jdbcConnection;
this.metrics = metrics;
this.partition = partition;
this.offsetContext = offsetContext;
this.dispatcher = dispatcher;
}
protected void reCreateInMemoryCache() {
getTransactionCache().keys(trStream -> {
trStream.forEach(tr -> {
getEventCache().keys(eventStream -> {
int count = (int) eventStream.filter(e -> e.startsWith(tr + "-")).count();
LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
inMemoryPendingTransactionsCache.initKey(tr, count);
});
});
});
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Overall Cache Statistics:");
LOGGER.info("\tTransactions : {}", getTransactionCache().size());
LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size());
LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size());
LOGGER.info("\tEvents : {}", getEventCache().size());
if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
getEventCache().keys(stream -> {
stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey));
});
}
}
@Override
protected boolean isRecentlyProcessed(String transactionId) {
return getProcessedTransactionsCache().containsKey(transactionId);
}
@Override
protected Scn getTransactionCacheMinimumScn() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.map(Transaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL));
}
protected Optional<T> getOldestTransactionInCache() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.min(this::compareStartScn));
}
@Override
protected String getFirstActiveTransactionKey() {
return getTransactionCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null);
}
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
List<String> eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-");
if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT.
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
eventKeys = getTransactionKeysWithPrefix(transactionPrefix);
if (!eventKeys.isEmpty()) {
// Enforce that the keys are always reverse sorted.
eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
}
else {
// Enforce that the keys are always reverse sorted.
eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.debug("Undo applied for event {}.", event);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
}
protected List<String> getTransactionKeysWithPrefix(String prefix) {
AtomicReference<List<String>> result = new AtomicReference<>();
getEventCache().keys(stream -> {
result.set(stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList()));
});
return result.get();
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
final String transactionId = row.getTransactionId();
if (isRecentlyProcessed(transactionId)) {
LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId);
return;
}
super.processRow(partition, row);
}
@Override
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return getSchemaChangesCache().containsKey(row.getScn().toString());
}
@Override
protected T getAndRemoveTransactionFromCache(String transactionId) {
// // todo: Infinispan bug?
// // When interacting with ISPN with a remote server configuration, the expected
// // behavior was that calling the remove method on the cache would return the
// // existing entry and remove it from the cache; however it always returned null.
// //
// // For now, we're going to use get to obtain the value and then remove it after-the-fact.
// final T transaction = getTransactionCache().get(transactionId);
// if (transaction != null) {
// getTransactionCache().remove(transactionId);
// }
// return transaction;
return getTransactionCache().remove(transactionId);
}
@Override
protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean isAbandoned) {
super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned);
removeEventsWithTransaction(transaction);
}
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
// cache recently committed transactions by transaction id
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
}
}
@Override
protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
final T transaction = getTransactionCache().get(transactionId);
if (transaction != null) {
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transactionId);
}
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
}
}
@Override
protected void resetTransactionToStart(T transaction) {
super.resetTransactionToStart(transaction);
// Flush the change created by the super class to the transaction cache
getTransactionCache().put(transaction.getTransactionId(), transaction);
}
@Override
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null) {
getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
if (!isRecentlyProcessed(transactionId)) {
T transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
transaction = createTransaction(row);
}
if (isTransactionOverEventThreshold(transaction)) {
abandonTransactionOverEventThreshold(transaction);
return;
}
String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagFromSource(row.getChangeTime());
inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
getTransactionCache().put(transactionId, transaction);
metrics.setActiveTransactionCount(getTransactionCache().size());
}
else {
LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", transactionId);
}
}
@Override
protected int getTransactionEventCount(T transaction) {
return inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId());
}
@Override
protected PreparedStatement createQueryStatement() throws SQLException {
return jdbcConnection.connection().prepareStatement(getQueryString(),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
private void removeEventsWithTransaction(T transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getEventCache().remove(transaction.getEventId(i));
}
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings
* which would allow "100" to come before "9".
*/
private static class EventKeySortComparator implements Comparator<String> {
public static EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
if (o1 == null || !o1.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
if (o2 == null || !o2.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
final String[] s1 = o1.split("-");
final String[] s2 = o2.split("-");
// Compare transaction ids, these should generally be identical.
int result = s1[0].compareTo(s2[0]);
if (result == 0) {
result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
}
return result;
}
}
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
* <p>
* NOTE: This method is abstract despite the code used by both all implementations being identical.
* This is because the method needed {@code entrySet()} is made available on two different concrete
* interfaces between the embedded and remote cache implementations, and therefore we need to access
* this method from the concrete implementation classes (RemoteCache and Cache) rather than from
* the common class used by CacheProvider (BasicCache).
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected void purgeCache(Scn minCacheScn) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
@Override
protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
// Cleanup caches based on current state of the transaction cache
final Optional<T> oldestTransaction = getOldestTransactionInCache();
final Scn minCacheScn;
final Instant minCacheScnChangeTime;
if (oldestTransaction.isPresent()) {
minCacheScn = oldestTransaction.get().getStartScn();
minCacheScnChangeTime = oldestTransaction.get().getChangeTime();
}
else {
minCacheScn = Scn.NULL;
minCacheScnChangeTime = null;
}
if (!minCacheScn.isNull()) {
abandonTransactions(getConfig().getLogMiningTransactionRetention());
purgeCache(minCacheScn);
}
else {
getSchemaChangesCache().removeIf(e -> true);
}
if (getConfig().isLobEnabled()) {
if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
if (!minCacheScn.isNull()) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
}
return offsetContext.getScn();
}
else {
if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) {
// If the last processed SCN is before the endScn we need to use the last processed SCN as the
// next starting point as the LGWR buffer didn't flush all entries from memory to disk yet.
endScn = getLastProcessedScn();
}
// update offsets
offsetContext.setScn(endScn);
metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
metrics.setOffsetScn(endScn);
// optionally dispatch a heartbeat event
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
return endScn;
}
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(T transaction) {
return new Iterator<>() {
private final int count = transaction.getNumberOfEvents();
private LogMinerEvent nextEvent;
private int index = 0;
@Override
public boolean hasNext() {
while (index < count) {
nextEvent = getEventCache().get(transaction.getEventId(index));
if (nextEvent == null) {
LOGGER.debug("Event {} must have been undone, skipped.", index);
// There are situations where an event will be removed from the cache when it is
// undone by the undo-row flag. The event id isn't re-used in this use case so
// the iterator automatically detects null entries and skips them by advancing
// to the next entry until either we've reached the number of events or detected
// a non-null entry available for return
index++;
continue;
}
break;
}
return index < count;
}
@Override
public LogMinerEvent next() {
index++;
return nextEvent;
}
};
}
}

View File

@ -3,16 +3,14 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import org.infinispan.commons.api.BasicCache;
package io.debezium.connector.oracle.logminer.processor;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
/**
* @author Chris Cranford
*/
public interface CacheProvider extends AutoCloseable {
public interface CacheProvider<T extends Transaction> extends AutoCloseable {
/**
* The name for the transaction cache
@ -49,7 +47,7 @@ public interface CacheProvider extends AutoCloseable {
*
* @return the transaction cache, never {@code null}
*/
BasicCache<String, InfinispanTransaction> getTransactionCache();
LogMinerCache<String, T> getTransactionCache();
/**
* Get the LogMiner events cache
@ -61,7 +59,7 @@ public interface CacheProvider extends AutoCloseable {
*
* @return the evnts cache, never {@code null}
*/
BasicCache<String, LogMinerEvent> getEventCache();
LogMinerCache<String, LogMinerEvent> getEventCache();
/**
* Get the Schema Changes cache
@ -73,7 +71,7 @@ public interface CacheProvider extends AutoCloseable {
*
* @return the schema changes cache, never {@code null}
*/
BasicCache<String, String> getSchemaChangesCache();
LogMinerCache<String, String> getSchemaChangesCache();
/**
* Get the processed transactions cache
@ -85,5 +83,5 @@ public interface CacheProvider extends AutoCloseable {
*
* @return the processed transactions cache, never {@code null}
*/
BasicCache<String, String> getProcessedTransactionsCache();
LogMinerCache<String, String> getProcessedTransactionsCache();
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
package io.debezium.connector.oracle.logminer.processor;
import java.util.HashMap;
import java.util.Map;
@ -11,23 +11,23 @@
/**
* An in-memory pending transaction cache, used for performance reasons.
*/
class InMemoryPendingTransactionsCache {
public class InMemoryPendingTransactionsCache {
/***
* Map of transaction ids to the number of events in cache
*/
private final Map<String, Integer> pendingTransactionInEventsCache = new HashMap<>();
Integer getNumPending(String transactionId) {
public Integer getNumPending(String transactionId) {
return pendingTransactionInEventsCache.getOrDefault(transactionId, 0);
}
String putOrIncrement(String transactionId) {
public String putOrIncrement(String transactionId) {
final Integer i = pendingTransactionInEventsCache.getOrDefault(transactionId, 0);
pendingTransactionInEventsCache.put(transactionId, i + 1);
return transactionId;
}
void decrement(String transactionId) {
public void decrement(String transactionId) {
final int i = pendingTransactionInEventsCache.getOrDefault(transactionId, 0);
if (i > 0) {
pendingTransactionInEventsCache.put(transactionId, i - 1);

View File

@ -0,0 +1,104 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
/**
* Interface describing the functionality needed to cache data in during LogMiner processing.
*/
public interface LogMinerCache<K, V> {
/**
* Consume a Stream of all the entries in the cache.
* @param entryStream
*/
void entries(Consumer<Stream<Entry<K, V>>> entryStream);
/**
* A Stream of available keys will be provided to the given Consumer.
*/
default void keys(Consumer<Stream<K>> keyStreamConsumer) {
entries(entryStream -> keyStreamConsumer.accept(entryStream.map(Entry::getKey)));
}
/**
* A Stream of available values will be provided to the given Consumer.
*/
default void values(Consumer<Stream<V>> valueStreamConsumer) {
entries(entryStream -> valueStreamConsumer.accept(entryStream.map(Entry::getValue)));
}
/**
* Clear all keys/values from the cache.
*/
void clear();
/**
* Retrieve the value for the given key.
*/
V get(K key);
/**
* Returns true if the cache is empty.
*/
boolean isEmpty();
/**
* Returns true if the cache contains the given key.
*/
boolean containsKey(K key);
/**
* Add the key and value into the cache.
*/
void put(K key, V value);
/**
* Remove the given key from the cache and return the value that was associated with it.
*/
V remove(K key);
/**
* Returns the size of the cache.
*/
int size();
void forEach(BiConsumer<K, V> action);
/**
* Remove all keys/values from the cache when the {@link Predicate} returns true;
*/
void removeIf(Predicate<Entry<K, V>> predicate);
/**
* Apply the given function to the provided stream and return the result from the function.
*/
<T> T streamAndReturn(Function<Stream<Entry<K, V>>, T> function);
class Entry<K, V> {
private final K key;
private final V value;
public Entry(K key, V value) {
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
}
}

View File

@ -50,6 +50,18 @@ public interface Transaction {
*/
int getNumberOfEvents();
/**
* Return the id of an event based on its index.
* @param index the index of the event
* @return the event id
*/
default String getEventId(int index) {
if (index < 0 || index >= getNumberOfEvents()) {
throw new IndexOutOfBoundsException("Index " + index + "outside the transaction " + getTransactionId() + " event list bounds");
}
return getTransactionId() + "-" + index;
}
/**
* Helper method to get the next event identifier for the transaction.
*
@ -66,9 +78,10 @@ public interface Transaction {
/**
* Helper method that resets the event identifier back to {@code 0}.
*
* <p>
* This should be called when a transaction {@code START} event is detected in the event stream.
* This is required when LOB support is enabled to facilitate the re-mining of existing events.
*/
void start();
}

View File

@ -5,38 +5,18 @@
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.CacheProvider;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
import io.debezium.util.Loggings;
/**
* An implementation of {@link io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor}
@ -44,421 +24,22 @@
*
* @author Chris Cranford
*/
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> implements CacheProvider {
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<InfinispanTransaction>
implements CacheProvider<InfinispanTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class);
private final OracleConnection jdbcConnection;
private final LogMinerStreamingChangeEventSourceMetrics metrics;
private final OraclePartition partition;
private final OracleOffsetContext offsetContext;
private final EventDispatcher<OraclePartition, TableId> dispatcher;
private InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
private static AbstractInfinispanLogMinerEventProcessor instance;
public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<OraclePartition, TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
this.jdbcConnection = jdbcConnection;
this.metrics = metrics;
this.partition = partition;
this.offsetContext = offsetContext;
this.dispatcher = dispatcher;
AbstractInfinispanLogMinerEventProcessor.instance = this;
}
protected void reCreateInMemoryCache() {
try (Stream<String> trStream = getTransactionCache().keySet().stream()) {
trStream.forEach(tr -> {
try (Stream<String> eventStream = getEventCache().keySet().stream()) {
int count = (int) eventStream.filter(k -> k.startsWith(tr + "-")).count();
LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
inMemoryPendingTransactionsCache.initKey(tr, count);
}
});
}
}
/**
* Can be used for reporting in Debezium Embedded mode
*/
public static void logCacheStats() {
if (instance != null) {
AbstractInfinispanLogMinerEventProcessor.instance.displayCacheStatistics();
}
else {
LOGGER.trace("AbstractInfinispanLogMinerEventProcessor is not initialized, skipping logging stats.");
}
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Overall Cache Statistics:");
LOGGER.info("\tTransactions : {}", getTransactionCache().size());
LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size());
LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size());
LOGGER.info("\tEvents : {}", getEventCache().size());
if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
try (Stream<String> stream = getEventCache().keySet().stream()) {
stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey));
}
}
}
@Override
protected boolean isRecentlyProcessed(String transactionId) {
return getProcessedTransactionsCache().containsKey(transactionId);
protected AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<OraclePartition, TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
}
@Override
protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
return new InfinispanTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName(), row.getThread());
}
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
List<String> eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-");
if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT.
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
eventKeys = getTransactionKeysWithPrefix(transactionPrefix);
if (!eventKeys.isEmpty()) {
// Enforce that the keys are always reverse sorted.
eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
}
else {
// Enforce that the keys are always reverse sorted.
eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.debug("Undo applied for event {}.", event);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
}
private List<String> getTransactionKeysWithPrefix(String prefix) {
try (Stream<String> stream = getEventCache().keySet().stream()) {
return stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList());
}
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
final String transactionId = row.getTransactionId();
if (isRecentlyProcessed(transactionId)) {
LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId);
return;
}
super.processRow(partition, row);
}
@Override
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return getSchemaChangesCache().containsKey(row.getScn().toString());
}
@Override
protected InfinispanTransaction getAndRemoveTransactionFromCache(String transactionId) {
// todo: Infinispan bug?
// When interacting with ISPN with a remote server configuration, the expected
// behavior was that calling the remove method on the cache would return the
// existing entry and remove it from the cache; however it always returned null.
//
// For now, we're going to use get to obtain the value and then remove it after-the-fact.
final InfinispanTransaction transaction = getTransactionCache().get(transactionId);
if (transaction != null) {
getTransactionCache().remove(transactionId);
}
return transaction;
}
@Override
protected void cleanupAfterTransactionRemovedFromCache(InfinispanTransaction transaction, boolean isAbandoned) {
super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned);
removeEventsWithTransaction(transaction);
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(InfinispanTransaction transaction) {
return new Iterator<LogMinerEvent>() {
private final int count = transaction.getNumberOfEvents();
private LogMinerEvent nextEvent;
private int index = 0;
@Override
public boolean hasNext() {
while (index < count) {
nextEvent = getEventCache().get(transaction.getEventId(index));
if (nextEvent == null) {
LOGGER.debug("Event {} must have been undone, skipped.", index);
// There are situations where an event will be removed from the cache when it is
// undone by the undo-row flag. The event id isn't re-used in this use case so
// the iterator automatically detects null entries and skips them by advancing
// to the next entry until either we've reached the number of events or detected
// a non-null entry available for return
index++;
continue;
}
break;
}
return index < count;
}
@Override
public LogMinerEvent next() {
index++;
return nextEvent;
}
};
}
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
// cache recently committed transactions by transaction id
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
}
}
@Override
protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
final InfinispanTransaction transaction = getTransactionCache().get(transactionId);
if (transaction != null) {
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transactionId);
}
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
}
}
@Override
protected void resetTransactionToStart(InfinispanTransaction transaction) {
super.resetTransactionToStart(transaction);
// Flush the change created by the super class to the transaction cache
getTransactionCache().put(transaction.getTransactionId(), transaction);
}
@Override
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null) {
getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
if (!isRecentlyProcessed(transactionId)) {
InfinispanTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
transaction = createTransaction(row);
}
if (isTransactionOverEventThreshold(transaction)) {
abandonTransactionOverEventThreshold(transaction);
return;
}
String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagFromSource(row.getChangeTime());
inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
getTransactionCache().put(transactionId, transaction);
metrics.setActiveTransactionCount(getTransactionCache().size());
}
else {
LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", transactionId);
}
}
@Override
protected int getTransactionEventCount(InfinispanTransaction transaction) {
return inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId());
}
@Override
protected PreparedStatement createQueryStatement() throws SQLException {
return jdbcConnection.connection().prepareStatement(getQueryString(),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@Override
protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
// Cleanup caches based on current state of the transaction cache
final Optional<InfinispanTransaction> oldestTransaction = getOldestTransactionInCache();
final Scn minCacheScn;
final Instant minCacheScnChangeTime;
if (oldestTransaction.isPresent()) {
minCacheScn = oldestTransaction.get().getStartScn();
minCacheScnChangeTime = oldestTransaction.get().getChangeTime();
}
else {
minCacheScn = Scn.NULL;
minCacheScnChangeTime = null;
}
if (!minCacheScn.isNull()) {
abandonTransactions(getConfig().getLogMiningTransactionRetention());
purgeCache(minCacheScn);
}
else {
getSchemaChangesCache().entrySet().removeIf(e -> true);
}
if (getConfig().isLobEnabled()) {
if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
if (!minCacheScn.isNull()) {
getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
}
return offsetContext.getScn();
}
else {
if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) {
// If the last processed SCN is before the endScn we need to use the last processed SCN as the
// next starting point as the LGWR buffer didn't flush all entries from memory to disk yet.
endScn = getLastProcessedScn();
}
// update offsets
offsetContext.setScn(endScn);
metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
metrics.setOffsetScn(endScn);
// optionally dispatch a heartbeat event
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
return endScn;
}
}
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
*
* NOTE: This method is abstract despite the code used by both all implementations being identical.
* This is because the method needed {@code entrySet()} is made available on two different concrete
* interfaces between the embedded and remote cache implementations, and therefore we need to access
* this method from the concrete implementation classes (RemoteCache and Cache) rather than from
* the common class used by CacheProvider (BasicCache).
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected abstract void purgeCache(Scn minCacheScn);
/**
* Helper method to remove entries that match the given predicate from a closeable iterator.
* This method guarantees that the underlying resources are released at the end of the operation.
*
* @param iterator the iterator
* @param filter the predicate
* @param <K> the key type
* @param <V> the value type
*/
protected <K, V> void removeIf(CloseableIterator<Map.Entry<K, V>> iterator, Predicate<Map.Entry<K, V>> filter) {
try (CloseableIterator<Map.Entry<K, V>> it = iterator) {
while (it.hasNext()) {
final Map.Entry<K, V> entry = it.next();
if (filter.test(entry)) {
it.remove();
}
}
}
}
private void removeEventsWithTransaction(InfinispanTransaction transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getEventCache().remove(transaction.getEventId(i));
}
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings
* which would allow "100" to come before "9".
*/
private static class EventKeySortComparator implements Comparator<String> {
public static EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
if (o1 == null || !o1.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
if (o2 == null || !o2.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
final String[] s1 = o1.split("-");
final String[] s2 = o2.split("-");
// Compare transaction ids, these should generally be identical.
int result = s1[0].compareTo(s2[0]);
if (result == 0) {
result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
}
return result;
}
}
}

View File

@ -12,11 +12,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.infinispan.Cache;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration;
@ -35,9 +31,10 @@
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.CacheProvider;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
@ -58,10 +55,10 @@ public class EmbeddedInfinispanLogMinerEventProcessor extends AbstractInfinispan
private final EmbeddedCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final Cache<String, InfinispanTransaction> transactionCache;
private final Cache<String, LogMinerEvent> eventCache;
private final Cache<String, String> processedTransactionsCache;
private final Cache<String, String> schemaChangesCache;
private final LogMinerCache<String, InfinispanTransaction> transactionCache;
private final LogMinerCache<String, LogMinerEvent> eventCache;
private final LogMinerCache<String, String> processedTransactionsCache;
private final LogMinerCache<String, String> schemaChangesCache;
public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
@ -107,87 +104,26 @@ public void close() throws Exception {
}
@Override
public BasicCache<String, InfinispanTransaction> getTransactionCache() {
public LogMinerCache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
public BasicCache<String, LogMinerEvent> getEventCache() {
public LogMinerCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public BasicCache<String, String> getSchemaChangesCache() {
public LogMinerCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public BasicCache<String, String> getProcessedTransactionsCache() {
public LogMinerCache<String, String> getProcessedTransactionsCache() {
return processedTransactionsCache;
}
@Override
protected Scn getTransactionCacheMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = transactionCache.values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
minimumScn = transactionScn;
}
else {
if (transactionScn.compareTo(minimumScn) < 0) {
minimumScn = transactionScn;
}
}
}
}
return minimumScn;
}
@Override
protected Optional<InfinispanTransaction> getOldestTransactionInCache() {
InfinispanTransaction transaction = null;
try (CloseableIterator<InfinispanTransaction> iterator = transactionCache.values().iterator()) {
if (iterator.hasNext()) {
// Seed with the first element
transaction = iterator.next();
while (iterator.hasNext()) {
final InfinispanTransaction entry = iterator.next();
int comparison = entry.getStartScn().compareTo(transaction.getStartScn());
if (comparison < 0) {
// if entry has a smaller scn, it came before.
transaction = entry;
}
else if (comparison == 0) {
// if entry has an equal scn, compare the change times.
if (entry.getChangeTime().isBefore(transaction.getChangeTime())) {
transaction = entry;
}
}
}
}
}
return Optional.ofNullable(transaction);
}
@Override
protected String getFirstActiveTransactionKey() {
try (CloseableIterator<String> iterator = transactionCache.keySet().iterator()) {
if (iterator.hasNext()) {
return iterator.next();
}
}
return null;
}
@Override
protected void purgeCache(Scn minCacheScn) {
removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
private <K, V> Cache<K, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
private <K, V> LogMinerCache<K, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);
final String cacheConfiguration = connectorConfig.getConfig().getString(field);
@ -195,7 +131,7 @@ private <K, V> Cache<K, V> createCache(String cacheName, OracleConnectorConfig c
// define the cache, parsing the supplied XML configuration
cacheManager.defineConfiguration(cacheName, parseAndGetConfiguration(cacheName, cacheConfiguration));
return cacheManager.getCache(cacheName);
return new InfinispanLogMinerCache<>(cacheManager.getCache(cacheName));
}
private Configuration parseAndGetConfiguration(String cacheName, String configuration) {

View File

@ -0,0 +1,92 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.api.BasicCache;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
public class InfinispanLogMinerCache<K, V> implements LogMinerCache<K, V> {
private final BasicCache<K, V> cache;
public InfinispanLogMinerCache(BasicCache<K, V> cache) {
this.cache = cache;
}
@Override
public boolean containsKey(K key) {
return cache.containsKey(key);
}
@Override
public int size() {
return cache.size();
}
@Override
public boolean isEmpty() {
return cache.isEmpty();
}
@Override
public V remove(K key) {
if (cache instanceof RemoteCache<K, V> remoteCache) {
return remoteCache.withFlags(Flag.FORCE_RETURN_VALUE).remove(key);
}
return cache.remove(key);
}
@Override
public V get(K key) {
return cache.get(key);
}
@Override
public void put(K key, V value) {
cache.put(key, value);
}
@Override
public void clear() {
cache.clear();
}
@Override
public void forEach(BiConsumer<K, V> action) {
cache.forEach(action);
}
@Override
public void removeIf(Predicate<Entry<K, V>> predicate) {
this.cache.entrySet().removeIf(kvEntry -> predicate.test(new Entry<>(kvEntry.getKey(), kvEntry.getValue())));
}
@Override
public void entries(Consumer<Stream<Entry<K, V>>> streamConsumer) {
try (Stream<Entry<K, V>> stream = this.cache.entrySet()
.stream()
.map(e -> new Entry<>(e.getKey(), e.getValue()))) {
streamConsumer.accept(stream);
}
}
@Override
public <T> T streamAndReturn(Function<Stream<Entry<K, V>>, T> function) {
try (Stream<Map.Entry<K, V>> stream = this.cache.entrySet().stream()) {
return function.apply(stream.map(e -> new Entry<>(e.getKey(), e.getValue())));
}
}
}

View File

@ -12,18 +12,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.checkerframework.checker.units.qual.C;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,9 +30,10 @@
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.CacheProvider;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.LogMinerEventMarshallerImpl;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.TransactionMarshallerImpl;
import io.debezium.pipeline.EventDispatcher;
@ -64,10 +61,10 @@ public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLo
private final RemoteCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final RemoteCache<String, InfinispanTransaction> transactionCache;
private final RemoteCache<String, LogMinerEvent> eventCache;
private final RemoteCache<String, String> processedTransactionsCache;
private final RemoteCache<String, String> schemaChangesCache;
private final LogMinerCache<String, InfinispanTransaction> transactionCache;
private final LogMinerCache<String, LogMinerEvent> eventCache;
private final LogMinerCache<String, String> processedTransactionsCache;
private final LogMinerCache<String, String> schemaChangesCache;
public RemoteInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
@ -119,86 +116,25 @@ public void close() throws Exception {
}
@Override
public BasicCache<String, InfinispanTransaction> getTransactionCache() {
public LogMinerCache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
public BasicCache<String, LogMinerEvent> getEventCache() {
public LogMinerCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public BasicCache<String, String> getSchemaChangesCache() {
public LogMinerCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public BasicCache<String, String> getProcessedTransactionsCache() {
public LogMinerCache<String, String> getProcessedTransactionsCache() {
return processedTransactionsCache;
}
@Override
protected Scn getTransactionCacheMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = transactionCache.values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
minimumScn = transactionScn;
}
else {
if (transactionScn.compareTo(minimumScn) < 0) {
minimumScn = transactionScn;
}
}
}
}
return minimumScn;
}
@Override
protected Optional<InfinispanTransaction> getOldestTransactionInCache() {
InfinispanTransaction transaction = null;
try (CloseableIterator<InfinispanTransaction> iterator = transactionCache.values().iterator()) {
if (iterator.hasNext()) {
// Seed with the first element
transaction = iterator.next();
while (iterator.hasNext()) {
final InfinispanTransaction entry = iterator.next();
int comparison = entry.getStartScn().compareTo(transaction.getStartScn());
if (comparison < 0) {
// if entry has a smaller scn, it came before.
transaction = entry;
}
else if (comparison == 0) {
// if entry has an equal scn, compare the change times.
if (entry.getChangeTime().isBefore(transaction.getChangeTime())) {
transaction = entry;
}
}
}
}
}
return Optional.ofNullable(transaction);
}
@Override
protected String getFirstActiveTransactionKey() {
try (CloseableIterator<String> iterator = transactionCache.keySet().iterator()) {
if (iterator.hasNext()) {
return iterator.next();
}
}
return null;
}
@Override
protected void purgeCache(Scn minCacheScn) {
removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) {
final Map<String, String> clientSettings = connectorConfig.getConfig()
.subset(HOTROD_CLIENT_LOOKUP_PREFIX, true)
@ -215,14 +151,14 @@ private Properties getHotrodClientProperties(OracleConnectorConfig connectorConf
return properties;
}
private <C, V> RemoteCache<C, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
private <C, V> LogMinerCache<C, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);
RemoteCache<C, V> cache = cacheManager.getCache(cacheName);
if (cache != null) {
// cache is already defined, simply return it
LOGGER.info("Remote cache '{}' already defined.", cacheName);
return cache;
return new InfinispanLogMinerCache<>(cache);
}
final String cacheConfiguration = connectorConfig.getConfig().getString(field);
@ -234,6 +170,6 @@ private <C, V> RemoteCache<C, V> createCache(String cacheName, OracleConnectorCo
}
LOGGER.info("Created remote infinispan cache: {}", cacheName);
return cache;
return new InfinispanLogMinerCache<>(cache);
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
public class MemoryBasedLogMinerCache<K, V> implements LogMinerCache<K, V> {
private final Map<K, V> map;
public MemoryBasedLogMinerCache(Map<K, V> map) {
this.map = map;
}
@Override
public void clear() {
map.clear();
}
@Override
public int size() {
return map.size();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public boolean containsKey(K key) {
return map.containsKey(key);
}
@Override
public V remove(K key) {
return map.remove(key);
}
@Override
public V get(K key) {
return map.get(key);
}
@Override
public void put(K key, V value) {
map.put(key, value);
}
@Override
public void forEach(BiConsumer<K, V> action) {
map.forEach(action);
}
@Override
public void removeIf(Predicate<Entry<K, V>> predicate) {
this.map.entrySet().removeIf(kvEntry -> predicate.test(new Entry<>(kvEntry.getKey(), kvEntry.getValue())));
}
@Override
public void entries(Consumer<Stream<Entry<K, V>>> streamConsumer) {
streamConsumer.accept(map.entrySet().stream()
.map(e -> new LogMinerCache.Entry<>(e.getKey(), e.getValue())));
}
@Override
public <T> T streamAndReturn(Function<Stream<Entry<K, V>>, T> function) {
try (Stream<Map.Entry<K, V>> stream = map.entrySet().stream()) {
return function.apply(stream.map(e -> new Entry<>(e.getKey(), e.getValue())));
}
}
}

View File

@ -14,6 +14,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
@ -29,6 +30,7 @@
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
@ -41,6 +43,7 @@
*
* @author Chris Cranford
*/
// TODO: can this be a caching impl now as well, just with map
public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor<MemoryTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLogMinerEventProcessor.class);
@ -49,10 +52,11 @@ public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor
private final OracleOffsetContext offsetContext;
private final LogMinerStreamingChangeEventSourceMetrics metrics;
private final HashMap<String, MemoryTransaction> transactionHashMap = new HashMap<>();
/**
* Cache of transactions, keyed based on the transaction's unique identifier
*/
private final Map<String, MemoryTransaction> transactionCache = new HashMap<>();
private final LogMinerCache<String, MemoryTransaction> transactionCache = new MemoryBasedLogMinerCache<>(transactionHashMap);
/**
* Cache of processed transactions (committed or rolled back), keyed based on the transaction's unique identifier.
*/
@ -75,7 +79,7 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
}
@Override
protected Map<String, MemoryTransaction> getTransactionCache() {
public LogMinerCache<String, MemoryTransaction> getTransactionCache() {
return transactionCache;
}
@ -95,17 +99,23 @@ protected void removeEventWithRowId(LogMinerEventRow row) {
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
for (String transactionKey : getTransactionCache().keySet()) {
getTransactionCache().forEach((transactionKey, v) -> {
if (transactionKey.startsWith(transactionPrefix)) {
transaction = getTransactionCache().get(transactionKey);
if (transaction != null && transaction.removeEventWithRowId(row.getRowId())) {
MemoryTransaction found = getTransactionCache().get(transactionKey);
// TODO: isn't found the same as v?
if (found != v) {
LOGGER.warn("HOW DID THIS HAPPEN?");
}
if (found != null && found.removeEventWithRowId(row.getRowId())) {
// We successfully found a transaction with the same XISUSN and XIDSLT and that
// transaction included a change for the specified row id.
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' was applied to transaction '{}'", row.getTableId(), transactionKey);
return;
}
}
}
});
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found", row.getTableId(),
row.getRowId());
}
@ -167,8 +177,14 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn
@Override
protected String getFirstActiveTransactionKey() {
final Iterator<String> keyIterator = transactionCache.keySet().iterator();
return keyIterator.hasNext() ? keyIterator.next() : null;
AtomicReference<String> result = new AtomicReference<>();
transactionCache.keys(keys -> {
Iterator<String> iterator = keys.iterator();
if (iterator.hasNext()) {
result.set(iterator.next());
}
});
return result.get();
}
@Override
@ -273,33 +289,38 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
@Override
protected Scn getTransactionCacheMinimumScn() {
return transactionCache.values().stream()
.map(MemoryTransaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL);
AtomicReference<Scn> result = new AtomicReference<>();
transactionCache.values(stream -> {
result.set(stream.map(MemoryTransaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL));
});
return result.get();
}
@Override
// TODO: extend cache processor?
protected Optional<MemoryTransaction> getOldestTransactionInCache() {
MemoryTransaction transaction = null;
if (!transactionCache.isEmpty()) {
// Seed with the first element
transaction = transactionCache.values().iterator().next();
for (MemoryTransaction entry : transactionCache.values()) {
int comparison = entry.getStartScn().compareTo(transaction.getStartScn());
if (comparison < 0) {
// if entry has a smaller scn, it came before.
transaction = entry;
}
else if (comparison == 0) {
// if entry has an equal scn, compare the change times.
if (entry.getChangeTime().isBefore(transaction.getChangeTime())) {
transaction = entry;
}
}
return getTransactionCache().streamAndReturn(entryStream -> entryStream
.map(LogMinerCache.Entry::getValue)
.min(this::compareTransactions));
}
protected MemoryTransaction compareOldest(MemoryTransaction first, MemoryTransaction second) {
int comparison = first.getStartScn().compareTo(second.getStartScn());
if (comparison < 0) {
return second;
}
else if (comparison == 0) {
if (second.getChangeTime().isBefore(first.getChangeTime())) {
return second;
}
}
return Optional.ofNullable(transaction);
return first;
}
protected int compareTransactions(MemoryTransaction first, MemoryTransaction second) {
return first.getStartScn().compareTo(second.getStartScn());
}
}

View File

@ -33,7 +33,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.connector.oracle.logminer.processor.CacheProvider;
import io.debezium.connector.oracle.rest.DebeziumOracleConnectorResourceIT;
import io.debezium.embedded.async.AsyncEmbeddedEngine;
import io.debezium.jdbc.JdbcConfiguration;