DBZ-7758 Fold abstract processor implementations

This commit is contained in:
Chris Cranford 2024-08-20 17:21:54 -04:00 committed by Jiri Pechanec
parent b21b057729
commit e151eb74d3
5 changed files with 328 additions and 467 deletions

View File

@ -13,9 +13,11 @@
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -70,6 +72,7 @@
import io.debezium.relational.Tables;
import io.debezium.text.ParsingException;
import io.debezium.util.Clock;
import io.debezium.util.Loggings;
import io.debezium.util.Strings;
import oracle.sql.RAW;
@ -79,15 +82,15 @@
*
* @author Chris Cranford
*/
public abstract class AbstractLogMinerEventProcessor<T extends Transaction> implements LogMinerEventProcessor {
public abstract class AbstractLogMinerEventProcessor<T extends Transaction> implements LogMinerEventProcessor, CacheProvider<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
private static final Logger ABANDONED_DETAILS_LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class.getName() + ".AbandonedDetails");
private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
private static final String XML_WRITE_PREAMBLE = "XML_REDO := ";
private static final String XML_WRITE_PREAMBLE_NULL = XML_WRITE_PREAMBLE + "NULL";
protected final OracleConnection jdbcConnection;
private final OracleConnection jdbcConnection;
private final ChangeEventSourceContext context;
private final OracleConnectorConfig connectorConfig;
private final OracleDatabaseSchema schema;
@ -111,6 +114,7 @@ public abstract class AbstractLogMinerEventProcessor<T extends Transaction> impl
private boolean sequenceUnavailable = false;
private final Set<String> abandonedTransactionsCache = new HashSet<>();
private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
protected AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
@ -137,6 +141,18 @@ protected AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
this.jdbcConnection = jdbcConnection;
}
protected void reCreateInMemoryCache() {
getTransactionCache().keys(trStream -> {
trStream.forEach(tr -> {
getEventCache().keys(eventStream -> {
int count = (int) eventStream.filter(e -> e.startsWith(tr + "-")).count();
LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
inMemoryPendingTransactionsCache.initKey(tr, count);
});
});
});
}
protected Set<String> getAbandonedTransactionsCache() {
return abandonedTransactionsCache;
}
@ -156,7 +172,7 @@ protected OracleDatabaseSchema getSchema() {
* @return true if the transaction has been recently processed, false otherwise
*/
protected boolean isRecentlyProcessed(String transactionId) {
return false;
return getProcessedTransactionsCache().containsKey(transactionId);
}
/**
@ -166,7 +182,7 @@ protected boolean isRecentlyProcessed(String transactionId) {
* @return true if the schema change has been seen, false otherwise.
*/
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return false;
return getSchemaChangesCache().containsKey(row.getScn().toString());
}
/**
@ -187,12 +203,6 @@ protected Instant getLastProcessedScnChangeTime() {
return lastProcessedScnChangeTime;
}
/**
* Returns the {@code TransactionCache} implementation.
* @return the transaction cache, never {@code null}
*/
protected abstract LogMinerCache<String, T> getTransactionCache();
/**
* Creates a new transaction based on the supplied {@code START} event.
*
@ -206,7 +216,39 @@ protected Instant getLastProcessedScnChangeTime() {
*
* @param row the event row that contains the row identifier, must not be {@code null}
*/
protected abstract void removeEventWithRowId(LogMinerEventRow row);
protected void removeEventWithRowId(LogMinerEventRow row) {
// locate the events based solely on XIDUSN and XIDSLT.
String basePrefix = getTransactionIdPrefix(row.getTransactionId());
List<String> eventKeysForBasePrefix = getTransactionKeysWithPrefix(basePrefix);
String transactionIdPrefix = row.getTransactionId() + "-";
// filter the existing list down to the events for the transaction
List<String> eventKeys = eventKeysForBasePrefix.stream()
.filter(k -> k.startsWith(transactionIdPrefix))
.toList();
if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved.
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", basePrefix);
eventKeys = eventKeysForBasePrefix;
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
if (!eventKeys.isEmpty()) {
removeEvents(row, eventKeys);
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
}
/**
* Returns the number of events associated with the specified transaction.
@ -214,7 +256,9 @@ protected Instant getLastProcessedScnChangeTime() {
* @param transaction the transaction, must not be {@code null}
* @return the number of events in the transaction
*/
protected abstract int getTransactionEventCount(T transaction);
protected int getTransactionEventCount(T transaction) {
return inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId());
}
// todo: can this be removed in favor of a single implementation?
protected boolean isTrxIdRawValue() {
@ -279,6 +323,20 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
}
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Overall Cache Statistics:");
LOGGER.info("\tTransactions : {}", getTransactionCache().size());
LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size());
LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size());
LOGGER.info("\tEvents : {}", getEventCache().size());
if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
getEventCache().keys(stream -> {
stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey));
});
}
}
/**
* Get the LogMiner query that will be used to fetch results.
*
@ -294,7 +352,12 @@ protected String getQueryString() {
* @return a prepared query statement, never {@code null}
* @throws SQLException if a database exception occurred creating the statement
*/
protected abstract PreparedStatement createQueryStatement() throws SQLException;
protected PreparedStatement createQueryStatement() throws SQLException {
return jdbcConnection.connection().prepareStatement(getQueryString(),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
/**
* Calculates the new starting system change number based on the current processing range.
@ -304,7 +367,60 @@ protected String getQueryString() {
* @return the system change number to start then next mining iteration from, never {@code null}
* @throws InterruptedException if the current thread is interrupted
*/
protected abstract Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException;
protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
// Cleanup caches based on current state of the transaction cache
final Optional<T> oldestTransaction = getOldestTransactionInCache();
final Scn minCacheScn;
final Instant minCacheScnChangeTime;
if (oldestTransaction.isPresent()) {
minCacheScn = oldestTransaction.get().getStartScn();
minCacheScnChangeTime = oldestTransaction.get().getChangeTime();
}
else {
minCacheScn = Scn.NULL;
minCacheScnChangeTime = null;
}
if (!minCacheScn.isNull()) {
abandonTransactions(getConfig().getLogMiningTransactionRetention());
purgeCache(minCacheScn);
}
else {
getSchemaChangesCache().removeIf(e -> true);
}
if (getConfig().isLobEnabled()) {
if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
if (!minCacheScn.isNull()) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
}
return offsetContext.getScn();
}
else {
if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) {
// If the last processed SCN is before the endScn we need to use the last processed SCN as the
// next starting point as the LGWR buffer didn't flush all entries from memory to disk yet.
endScn = getLastProcessedScn();
}
offsetContext.setScn(minCacheScn.isNull() ? endScn : minCacheScn.subtract(Scn.valueOf(1)));
metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
metrics.setOffsetScn(offsetContext.getScn());
// optionally dispatch a heartbeat event
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
return endScn;
}
}
/**
* Processes the LogMiner results.
@ -328,6 +444,12 @@ protected void processResults(OraclePartition partition, ResultSet resultSet) th
* @throws InterruptedException if the dispatcher was interrupted sending an event
*/
protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
final String transactionId = row.getTransactionId();
if (isRecentlyProcessed(transactionId)) {
LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId);
return;
}
if (!row.getEventType().equals(EventType.MISSING_SCN)) {
lastProcessedScn = row.getScn();
lastProcessedScnChangeTime = row.getChangeTime();
@ -459,6 +581,8 @@ else if (transaction != null && !isRecentlyProcessed(transactionId)) {
protected void resetTransactionToStart(T transaction) {
transaction.start();
// Flush the change created by the super class to the transaction cache
getTransactionCache().put(transaction.getTransactionId(), transaction);
}
/**
@ -656,13 +780,25 @@ protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
abandonedTransactionsCache.remove(row.getTransactionId());
}
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected void purgeCache(Scn minCacheScn) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
/**
* Gets a transaction instance from the transaction cache while also removing its cache entry.
*
* @param transactionId the transaction's unique identifier, should not be {@code null}
* @return the transaction instance if found, {@code null} if the transaction wasn't found
*/
protected abstract T getAndRemoveTransactionFromCache(String transactionId);
protected T getAndRemoveTransactionFromCache(String transactionId) {
return getTransactionCache().remove(transactionId);
}
/**
* Removes the items associated with the transaction (e.g. events if they are stored independently).
@ -677,6 +813,7 @@ protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean is
else {
abandonedTransactionsCache.remove(transaction.getTransactionId());
}
removeEventsWithTransaction(transaction);
}
/**
@ -685,7 +822,39 @@ protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean is
* @param transaction the transaction instance, should never be {@code null}
* @return an iterator over the transaction's events, never {@code null}
*/
protected abstract Iterator<LogMinerEvent> getTransactionEventIterator(T transaction);
protected Iterator<LogMinerEvent> getTransactionEventIterator(T transaction) {
return new Iterator<>() {
private final int count = transaction.getNumberOfEvents();
private LogMinerEvent nextEvent;
private int index = 0;
@Override
public boolean hasNext() {
while (index < count) {
nextEvent = getEventCache().get(transaction.getEventId(index));
if (nextEvent == null) {
LOGGER.debug("Event {} must have been undone, skipped.", index);
// There are situations where an event will be removed from the cache when it is
// undone by the undo-row flag. The event id isn't re-used in this use case so
// the iterator automatically detects null entries and skips them by advancing
// to the next entry until either we've reached the number of events or detected
// a non-null entry available for return
index++;
continue;
}
break;
}
return index < count;
}
@Override
public LogMinerEvent next() {
index++;
return nextEvent;
}
};
}
/**
* Finalizes the commit of a transaction.
@ -693,14 +862,23 @@ protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean is
* @param transactionId the transaction's unique identifier, should not be {@code null}
* @param commitScn the transaction's system change number, should not be {@code null}
*/
protected abstract void finalizeTransactionCommit(String transactionId, Scn commitScn);
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
// cache recently committed transactions by transaction id
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
}
}
/**
* Returns only the first transaction id in the transaction buffer.
*
* @return the first active transaction in the buffer, or {@code null} if there is none.
*/
protected abstract String getFirstActiveTransactionKey();
protected String getFirstActiveTransactionKey() {
return getTransactionCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null);
}
/**
* Check whether the supplied username associated with the specified transaction is excluded.
@ -748,7 +926,17 @@ protected void handleRollback(LogMinerEventRow row) {
* @param transactionId the unique transaction identifier, never {@code null}
* @param rollbackScn the rollback transaction's system change number, never {@code null}
*/
protected abstract void finalizeTransactionRollback(String transactionId, Scn rollbackScn);
protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
final T transaction = getTransactionCache().get(transactionId);
if (transaction != null) {
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transactionId);
}
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
}
}
/**
* Handle processing a LogMinerEventRow for a {@code DDL} event.
@ -822,6 +1010,10 @@ else if (activeTransactions == 1) {
offsetContext.setRsId(row.getRsId());
offsetContext.setRowId("");
if (getConfig().isLobEnabled()) {
getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext,
tableId,
new OracleSchemaChangeEventEmitter(
@ -1321,11 +1513,44 @@ else if (sequenceUnavailable) {
/**
* Add a transaction to the transaction map if allowed.
*
* @param transactionId the unqiue transaction id
* @param transactionId the unique transaction id
* @param row the LogMiner event row
* @param eventSupplier the supplier of the event to create if the event is allowed to be added
*/
protected abstract void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier);
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
if (isRecentlyProcessed(transactionId)) {
LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", transactionId);
return;
}
T transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
transaction = createTransaction(row);
}
if (isTransactionOverEventThreshold(transaction)) {
abandonTransactionOverEventThreshold(transaction);
return;
}
final String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagFromSource(row.getChangeTime());
inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
getTransactionCache().put(transactionId, transaction);
metrics.setActiveTransactionCount(getTransactionCache().size());
}
/**
* Dispatch a schema change event for a new table and get the newly created relational table model.
@ -1491,14 +1716,22 @@ private static class ParsedLobWriteSql {
* Gets the minimum system change number stored in the transaction cache.
* @return the minimum system change number, never {@code null} but could be {@link Scn#NULL}.
*/
protected abstract Scn getTransactionCacheMinimumScn();
protected Scn getTransactionCacheMinimumScn() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.map(Transaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL));
}
/**
* Get the oldest transaction in the cache.
*
* @return the oldest transaction in the cache or maybe {@code null} if cache is empty
*/
protected abstract Optional<T> getOldestTransactionInCache();
protected Optional<T> getOldestTransactionInCache() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.min(this::oldestTransactionComparison));
}
/**
* Returns whether the transaction id has no sequence number component.
@ -1663,6 +1896,16 @@ protected Optional<Scn> getLastScnToAbandon(OracleConnection connection, Duratio
}
}
protected List<String> getTransactionKeysWithPrefix(String prefix) {
// Enforce that the keys are always reverse sorted.
return getEventCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey)
.filter(k -> k.startsWith(prefix))
.sorted(EventKeySortComparator.INSTANCE.reversed())
.collect(Collectors.toList()) // must use Collectors.toList to avoid bug in ISPN for now
);
}
/**
* Calculates the last system change number to abandon by directly examining the transaction buffer
* cache and comparing the transaction start time to the most recent last processed change time and
@ -1694,6 +1937,28 @@ private Scn getLastScnToAbandonFallbackByTransactionChangeTime(Duration retentio
});
}
private void removeEventsWithTransaction(T transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getEventCache().remove(transaction.getEventId(i));
}
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
private void removeEvents(LogMinerEventRow row, List<String> eventKeys) {
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
protected int compareStartScn(T first, T second) {
return first.getStartScn().compareTo(second.getStartScn());
}
@ -1750,4 +2015,33 @@ public String toString() {
'}';
}
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings
* which would allow "100" to come before "9".
*/
private static class EventKeySortComparator implements Comparator<String> {
public static final EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
if (o1 == null || !o1.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
if (o2 == null || !o2.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
final String[] s1 = o1.split("-");
final String[] s2 = o2.split("-");
// Compare transaction ids, these should generally be identical.
int result = s1[0].compareTo(s2[0]);
if (result == 0) {
result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
}
return result;
}
}
}

View File

@ -1,433 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
import io.debezium.util.Loggings;
/**
* An implementation of {@link LogMinerEventProcessor}
* that uses Infinispan to persist the transaction cache across restarts on disk.
*
* @author Chris Cranford
*/
public abstract class AbstractTransactionCachingLogMinerEventProcessor<T extends Transaction> extends AbstractLogMinerEventProcessor<T> implements CacheProvider<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTransactionCachingLogMinerEventProcessor.class);
private final OracleConnection jdbcConnection;
private final LogMinerStreamingChangeEventSourceMetrics metrics;
private final OraclePartition partition;
private final OracleOffsetContext offsetContext;
private final EventDispatcher<OraclePartition, TableId> dispatcher;
private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
protected AbstractTransactionCachingLogMinerEventProcessor(
ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<OraclePartition, TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
this.jdbcConnection = jdbcConnection;
this.metrics = metrics;
this.partition = partition;
this.offsetContext = offsetContext;
this.dispatcher = dispatcher;
}
protected void reCreateInMemoryCache() {
getTransactionCache().keys(trStream -> {
trStream.forEach(tr -> {
getEventCache().keys(eventStream -> {
int count = (int) eventStream.filter(e -> e.startsWith(tr + "-")).count();
LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
inMemoryPendingTransactionsCache.initKey(tr, count);
});
});
});
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Overall Cache Statistics:");
LOGGER.info("\tTransactions : {}", getTransactionCache().size());
LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size());
LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size());
LOGGER.info("\tEvents : {}", getEventCache().size());
if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
getEventCache().keys(stream -> {
stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey));
});
}
}
@Override
protected boolean isRecentlyProcessed(String transactionId) {
return getProcessedTransactionsCache().containsKey(transactionId);
}
// from EmbeddedInfinispanLogMinerEventProcessor, impl is different
@Override
protected Scn getTransactionCacheMinimumScn() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.map(Transaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL));
}
// from EmbeddedInfinispanLogMinerEventProcessor, impl is different
protected Optional<T> getOldestTransactionInCache() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.min(this::oldestTransactionComparison));
}
// from EmbeddedInfinispanLogMinerEventProcessor, impl is different
@Override
protected String getFirstActiveTransactionKey() {
return getTransactionCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null);
}
// this impl is different
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
// locate the events based solely on XIDUSN and XIDSLT.
String basePrefix = getTransactionIdPrefix(row.getTransactionId());
List<String> eventKeysForBasePrefix = getTransactionKeysWithPrefix(basePrefix);
String transactionIdPrefix = row.getTransactionId() + "-";
// filter the existing list down to the events for the transaction
List<String> eventKeys = eventKeysForBasePrefix.stream()
.filter(k -> k.startsWith(transactionIdPrefix))
.toList();
if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved.
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", basePrefix);
eventKeys = eventKeysForBasePrefix;
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
if (!eventKeys.isEmpty()) {
removeEvents(row, eventKeys);
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
}
protected List<String> getTransactionKeysWithPrefix(String prefix) {
// Enforce that the keys are always reverse sorted.
return getEventCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey)
.filter(k -> k.startsWith(prefix))
.sorted(EventKeySortComparator.INSTANCE.reversed())
.collect(Collectors.toList()) // must use Collectors.toList to avoid bug in ISPN for now
);
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
final String transactionId = row.getTransactionId();
if (isRecentlyProcessed(transactionId)) {
LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId);
return;
}
super.processRow(partition, row);
}
@Override
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return getSchemaChangesCache().containsKey(row.getScn().toString());
}
// different from AbstractInfinispanLogMinerEventProcessor
@Override
protected T getAndRemoveTransactionFromCache(String transactionId) {
return getTransactionCache().remove(transactionId);
}
@Override
protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean isAbandoned) {
super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned);
removeEventsWithTransaction(transaction);
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(T transaction) {
return new Iterator<>() {
private final int count = transaction.getNumberOfEvents();
private LogMinerEvent nextEvent;
private int index = 0;
@Override
public boolean hasNext() {
while (index < count) {
nextEvent = getEventCache().get(transaction.getEventId(index));
if (nextEvent == null) {
LOGGER.debug("Event {} must have been undone, skipped.", index);
// There are situations where an event will be removed from the cache when it is
// undone by the undo-row flag. The event id isn't re-used in this use case so
// the iterator automatically detects null entries and skips them by advancing
// to the next entry until either we've reached the number of events or detected
// a non-null entry available for return
index++;
continue;
}
break;
}
return index < count;
}
@Override
public LogMinerEvent next() {
index++;
return nextEvent;
}
};
}
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
// cache recently committed transactions by transaction id
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
}
}
@Override
protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
final T transaction = getTransactionCache().get(transactionId);
if (transaction != null) {
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transactionId);
}
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
}
}
@Override
protected void resetTransactionToStart(T transaction) {
super.resetTransactionToStart(transaction);
// Flush the change created by the super class to the transaction cache
getTransactionCache().put(transaction.getTransactionId(), transaction);
}
@Override
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null && getConfig().isLobEnabled()) {
getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
if (!isRecentlyProcessed(transactionId)) {
T transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
transaction = createTransaction(row);
}
if (isTransactionOverEventThreshold(transaction)) {
abandonTransactionOverEventThreshold(transaction);
return;
}
String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagFromSource(row.getChangeTime());
inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
getTransactionCache().put(transactionId, transaction);
metrics.setActiveTransactionCount(getTransactionCache().size());
}
else {
LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", transactionId);
}
}
@Override
protected int getTransactionEventCount(T transaction) {
return inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId());
}
@Override
protected PreparedStatement createQueryStatement() throws SQLException {
return jdbcConnection.connection().prepareStatement(getQueryString(),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@Override
protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
// Cleanup caches based on current state of the transaction cache
final Optional<T> oldestTransaction = getOldestTransactionInCache();
final Scn minCacheScn;
final Instant minCacheScnChangeTime;
if (oldestTransaction.isPresent()) {
minCacheScn = oldestTransaction.get().getStartScn();
minCacheScnChangeTime = oldestTransaction.get().getChangeTime();
}
else {
minCacheScn = Scn.NULL;
minCacheScnChangeTime = null;
}
if (!minCacheScn.isNull()) {
abandonTransactions(getConfig().getLogMiningTransactionRetention());
purgeCache(minCacheScn);
}
else {
getSchemaChangesCache().removeIf(e -> true);
}
if (getConfig().isLobEnabled()) {
if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
if (!minCacheScn.isNull()) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
}
return offsetContext.getScn();
}
else {
if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) {
// If the last processed SCN is before the endScn we need to use the last processed SCN as the
// next starting point as the LGWR buffer didn't flush all entries from memory to disk yet.
endScn = getLastProcessedScn();
}
offsetContext.setScn(minCacheScn.isNull() ? endScn : minCacheScn.subtract(Scn.valueOf(1)));
metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
metrics.setOffsetScn(offsetContext.getScn());
// optionally dispatch a heartbeat event
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
return endScn;
}
}
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected void purgeCache(Scn minCacheScn) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
private void removeEventsWithTransaction(T transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getEventCache().remove(transaction.getEventId(i));
}
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
private void removeEvents(LogMinerEventRow row, List<String> eventKeys) {
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey);
getEventCache().remove(eventKey);
inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
return;
}
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings
* which would allow "100" to come before "9".
*/
private static class EventKeySortComparator implements Comparator<String> {
public static final EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
if (o1 == null || !o1.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
if (o2 == null || !o2.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
final String[] s1 = o1.split("-");
final String[] s2 = o2.split("-");
// Compare transaction ids, these should generally be identical.
int result = s1[0].compareTo(s2[0]);
if (result == 0) {
result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
}
return result;
}
}
}

View File

@ -38,19 +38,19 @@
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
/**
* An {@link AbstractTransactionCachingLogMinerEventProcessor} implementation for storing buffer details
* off-heap in a set of Ehcache-backed caches.
* An {@link AbstractLogMinerEventProcessor} implementation for storing buffer details off-heap in a
* set of Ehcache-backed caches.
*
* @author Chris Cranford
*/
public class EhcacheLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<EhcacheTransaction> {
public class EhcacheLogMinerEventProcessor extends AbstractLogMinerEventProcessor<EhcacheTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheLogMinerEventProcessor.class);
@ -69,7 +69,7 @@ public EhcacheLogMinerEventProcessor(ChangeEventSourceContext context,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
LOGGER.info("Using Ehcache buffer");
this.cacheManager = createCacheManager(connectorConfig);

View File

@ -12,7 +12,7 @@
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.CacheProvider;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
@ -24,7 +24,7 @@
*
* @author Chris Cranford
*/
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<InfinispanTransaction>
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction>
implements CacheProvider<InfinispanTransaction> {
protected AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
@ -35,7 +35,7 @@ protected AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext cont
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
}
@Override

View File

@ -13,7 +13,7 @@
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.pipeline.EventDispatcher;
@ -26,7 +26,7 @@
*
* @author Chris Cranford
*/
public class MemoryLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<MemoryTransaction> {
public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor<MemoryTransaction> {
private final LogMinerCache<String, MemoryTransaction> transactionCache = new MemoryBasedLogMinerCache<>();
private final LogMinerCache<String, LogMinerEvent> eventCache = new MemoryBasedLogMinerCache<>();
@ -41,7 +41,7 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
}
@Override