DBZ-8054 Change MemoryLogMinerEventProcessor to extend AbstractTransactionCachingLogMinerEventProcessor

This commit is contained in:
Jeremy Ford 2024-07-16 18:06:56 -04:00 committed by Chris Cranford
parent ff828a340e
commit 1be6121416
2 changed files with 22 additions and 277 deletions

View File

@ -193,17 +193,6 @@ protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
@Override
protected T getAndRemoveTransactionFromCache(String transactionId) {
// // todo: Infinispan bug?
// // When interacting with ISPN with a remote server configuration, the expected
// // behavior was that calling the remove method on the cache would return the
// // existing entry and remove it from the cache; however it always returned null.
// //
// // For now, we're going to use get to obtain the value and then remove it after-the-fact.
// final T transaction = getTransactionCache().get(transactionId);
// if (transaction != null) {
// getTransactionCache().remove(transactionId);
// }
// return transaction;
return getTransactionCache().remove(transactionId);
}

View File

@ -5,37 +5,22 @@
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
import io.debezium.util.Loggings;
/**
* A {@link LogMinerEventProcessor} that uses the JVM heap to store events as they're being
@ -43,25 +28,15 @@
*
* @author Chris Cranford
*/
// TODO: can this be a caching impl now as well, just with map
public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor<MemoryTransaction> {
public class MemoryLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<MemoryTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLogMinerEventProcessor.class);
private final EventDispatcher<OraclePartition, TableId> dispatcher;
private final OraclePartition partition;
private final OracleOffsetContext offsetContext;
private final LogMinerStreamingChangeEventSourceMetrics metrics;
private final HashMap<String, MemoryTransaction> transactionHashMap = new HashMap<>();
/**
* Cache of transactions, keyed based on the transaction's unique identifier
*/
private final LogMinerCache<String, MemoryTransaction> transactionCache = new MemoryBasedLogMinerCache<>(transactionHashMap);
/**
* Cache of processed transactions (committed or rolled back), keyed based on the transaction's unique identifier.
*/
private final Map<String, Scn> recentlyProcessedTransactionsCache = new HashMap<>();
private final Set<Scn> schemaChangesCache = new HashSet<>();
private final LogMinerCache<String, MemoryTransaction> transactionCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, LogMinerEvent> eventCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, String> schemaCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
private final LogMinerCache<String, String> processedTransactionsCache = new MemoryBasedLogMinerCache<>(new HashMap<>());
public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
@ -71,16 +46,7 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
this.dispatcher = dispatcher;
this.partition = partition;
this.offsetContext = offsetContext;
this.metrics = (LogMinerStreamingChangeEventSourceMetrics) metrics;
}
@Override
public LogMinerCache<String, MemoryTransaction> getTransactionCache() {
return transactionCache;
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
}
@Override
@ -89,238 +55,28 @@ protected MemoryTransaction createTransaction(LogMinerEventRow row) {
}
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId());
if (transaction == null) {
if (isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT.
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
public LogMinerCache<String, MemoryTransaction> getTransactionCache() {
return transactionCache;
}
getTransactionCache().forEach((transactionKey, v) -> {
if (transactionKey.startsWith(transactionPrefix)) {
MemoryTransaction found = getTransactionCache().get(transactionKey);
// TODO: isn't found the same as v?
if (found != v) {
LOGGER.warn("HOW DID THIS HAPPEN?");
}
@Override
public LogMinerCache<String, LogMinerEvent> getEventCache() {
if (found != null && found.removeEventWithRowId(row.getRowId())) {
// We successfully found a transaction with the same XISUSN and XIDSLT and that
// transaction included a change for the specified row id.
Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' was applied to transaction '{}'", row.getTableId(), transactionKey);
}
}
});
return eventCache;
}
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found", row.getTableId(),
row.getRowId());
}
else if (!getConfig().isLobEnabled()) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(),
row.getTransactionId());
}
}
else {
if (!transaction.removeEventWithRowId(row.getRowId())) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
}
@Override
public LogMinerCache<String, String> getSchemaChangesCache() {
return schemaCache;
}
@Override
public LogMinerCache<String, String> getProcessedTransactionsCache() {
return processedTransactionsCache;
}
@Override
public void close() throws Exception {
// close any resources used here
}
@Override
protected boolean isRecentlyProcessed(String transactionId) {
return recentlyProcessedTransactionsCache.containsKey(transactionId);
}
@Override
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return schemaChangesCache.contains(row.getScn());
}
@Override
protected MemoryTransaction getAndRemoveTransactionFromCache(String transactionId) {
return getTransactionCache().remove(transactionId);
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(MemoryTransaction transaction) {
return transaction.getEvents().iterator();
}
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
// cache recently committed transactions by transaction id
recentlyProcessedTransactionsCache.put(transactionId, commitScn);
}
}
@Override
protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
transactionCache.remove(transactionId);
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
recentlyProcessedTransactionsCache.put(transactionId, rollbackScn);
}
}
@Override
protected String getFirstActiveTransactionKey() {
AtomicReference<String> result = new AtomicReference<>();
transactionCache.keys(keys -> {
Iterator<String> iterator = keys.iterator();
if (iterator.hasNext()) {
result.set(iterator.next());
}
});
return result.get();
}
@Override
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null && getConfig().isLobEnabled()) {
schemaChangesCache.add(row.getScn());
}
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
if (!isRecentlyProcessed(transactionId)) {
MemoryTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not in cache for DML, creating.", transactionId);
transaction = createTransaction(row);
getTransactionCache().put(transactionId, transaction);
}
if (isTransactionOverEventThreshold(transaction)) {
abandonTransactionOverEventThreshold(transaction);
return;
}
int eventId = transaction.getNextEventId();
if (transaction.getEvents().size() <= eventId) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId);
transaction.getEvents().add(eventSupplier.get());
metrics.calculateLagFromSource(row.getChangeTime());
}
metrics.setActiveTransactionCount(getTransactionCache().size());
}
else if (!getConfig().isLobEnabled()) {
// Explicitly only log this warning when LobEnabled is false because its commonplace for a
// transaction to be re-mined and therefore seen as already processed until the SCN low
// watermark is advanced after a long transaction is committed.
LOGGER.warn("Event for transaction {} has already been processed, skipped.", transactionId);
}
}
@Override
protected int getTransactionEventCount(MemoryTransaction transaction) {
return transaction.getEvents().size();
}
@Override
protected PreparedStatement createQueryStatement() throws SQLException {
return jdbcConnection.connection().prepareStatement(getQueryString(),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@Override
protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
if (getConfig().isLobEnabled()) {
if (transactionCache.isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
abandonTransactions(getConfig().getLogMiningTransactionRetention());
final Scn minStartScn = getTransactionCacheMinimumScn();
if (!minStartScn.isNull()) {
recentlyProcessedTransactionsCache.entrySet().removeIf(entry -> entry.getValue().compareTo(minStartScn) < 0);
schemaChangesCache.removeIf(scn -> scn.compareTo(minStartScn) < 0);
offsetContext.setScn(minStartScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
}
return offsetContext.getScn();
}
else {
if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) {
// If the last processed SCN is before the endScn we need to use the last processed SCN as the
// next starting point as the LGWR buffer didn't flush all entries from memory to disk yet.
endScn = getLastProcessedScn();
}
if (transactionCache.isEmpty()) {
offsetContext.setScn(endScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
abandonTransactions(getConfig().getLogMiningTransactionRetention());
final Scn minStartScn = getTransactionCacheMinimumScn();
if (!minStartScn.isNull()) {
offsetContext.setScn(minStartScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
}
return endScn;
}
}
@Override
protected Scn getTransactionCacheMinimumScn() {
AtomicReference<Scn> result = new AtomicReference<>();
transactionCache.values(stream -> {
result.set(stream.map(MemoryTransaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL));
});
return result.get();
}
// TODO: extend cache processor?
protected Optional<MemoryTransaction> getOldestTransactionInCache() {
return getTransactionCache().streamAndReturn(entryStream -> entryStream
.map(LogMinerCache.Entry::getValue)
.min(this::compareTransactions));
}
protected MemoryTransaction compareOldest(MemoryTransaction first, MemoryTransaction second) {
int comparison = first.getStartScn().compareTo(second.getStartScn());
if (comparison < 0) {
return second;
}
else if (comparison == 0) {
if (second.getChangeTime().isBefore(first.getChangeTime())) {
return second;
}
}
return first;
}
protected int compareTransactions(MemoryTransaction first, MemoryTransaction second) {
return first.getStartScn().compareTo(second.getStartScn());
}
}