DBZ-8054 Reorder methods for easier comparison. Marked methods where impls are different from original.

This commit is contained in:
Jeremy Ford 2024-07-29 21:00:45 -04:00 committed by Chris Cranford
parent c056d7657e
commit 76aa135c0c
2 changed files with 91 additions and 88 deletions

View File

@ -1546,6 +1546,7 @@ protected void abandonTransactionOverEventThreshold(T transaction) {
metrics.incrementOversizedTransactionCount();
}
//this implementation is different
@Override
public void abandonTransactions(Duration retention) throws InterruptedException {
if (!Duration.ZERO.equals(retention)) {
@ -1556,12 +1557,12 @@ public void abandonTransactions(Duration retention) throws InterruptedException
if (!smallestScn.isNull() && thresholdScn.compareTo(smallestScn) >= 0) {
LogMinerCache<String, T> transactionCache = getTransactionCache();
Map<String, T> abandonedT = transactionCache.streamAndReturn(stream -> stream
Map<String, T> abandoned = transactionCache.streamAndReturn(stream -> stream
.filter(e -> e.getValue().getStartScn().compareTo(thresholdScn) <= 0)
.collect(Collectors.toMap(LogMinerCache.Entry::getKey, LogMinerCache.Entry::getValue)));
boolean first = true;
for (Map.Entry<String, T> entry : abandonedT.entrySet()) {
for (Map.Entry<String, T> entry : abandoned.entrySet()) {
if (first) {
LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn);
first = false;
@ -1582,7 +1583,7 @@ public void abandonTransactions(Duration retention) throws InterruptedException
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]",
String.join(",", abandonedT.keySet()));
String.join(",", abandoned.keySet()));
transactionCache.keys(keys -> LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]",
keys.collect(Collectors.joining(","))));

View File

@ -97,6 +97,7 @@ protected boolean isRecentlyProcessed(String transactionId) {
return getProcessedTransactionsCache().containsKey(transactionId);
}
//from EmbeddedInfinispanLogMinerEventProcessor, impl is different
@Override
protected Scn getTransactionCacheMinimumScn() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
@ -105,17 +106,20 @@ protected Scn getTransactionCacheMinimumScn() {
.orElse(Scn.NULL));
}
//from EmbeddedInfinispanLogMinerEventProcessor, impl is different
protected Optional<T> getOldestTransactionInCache() {
return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue)
.min(this::oldestTransactionComparison));
}
//from EmbeddedInfinispanLogMinerEventProcessor, impl is different
@Override
protected String getFirstActiveTransactionKey() {
return getTransactionCache()
.streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null);
}
//this impl is different
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
// locate the events based solely on XIDUSN and XIDSLT.
@ -137,6 +141,9 @@ protected void removeEventWithRowId(LogMinerEventRow row) {
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", basePrefix);
eventKeys = eventKeysForBasePrefix;
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
if (!eventKeys.isEmpty()) {
@ -172,6 +179,7 @@ protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return getSchemaChangesCache().containsKey(row.getScn().toString());
}
//different from AbstractInfinispanLogMinerEventProcessor
@Override
protected T getAndRemoveTransactionFromCache(String transactionId) {
return getTransactionCache().remove(transactionId);
@ -183,6 +191,41 @@ protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean is
removeEventsWithTransaction(transaction);
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(T transaction) {
return new Iterator<>() {
private final int count = transaction.getNumberOfEvents();
private LogMinerEvent nextEvent;
private int index = 0;
@Override
public boolean hasNext() {
while (index < count) {
nextEvent = getEventCache().get(transaction.getEventId(index));
if (nextEvent == null) {
LOGGER.debug("Event {} must have been undone, skipped.", index);
// There are situations where an event will be removed from the cache when it is
// undone by the undo-row flag. The event id isn't re-used in this use case so
// the iterator automatically detects null entries and skips them by advancing
// to the next entry until either we've reached the number of events or detected
// a non-null entry available for return
index++;
continue;
}
break;
}
return index < count;
}
@Override
public LogMinerEvent next() {
index++;
return nextEvent;
}
};
}
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
@ -268,59 +311,6 @@ protected PreparedStatement createQueryStatement() throws SQLException {
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
private void removeEventsWithTransaction(T transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getEventCache().remove(transaction.getEventId(i));
}
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings
* which would allow "100" to come before "9".
*/
private static class EventKeySortComparator implements Comparator<String> {
public static final EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
if (o1 == null || !o1.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
if (o2 == null || !o2.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
final String[] s1 = o1.split("-");
final String[] s2 = o2.split("-");
// Compare transaction ids, these should generally be identical.
int result = s1[0].compareTo(s2[0]);
if (result == 0) {
result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
}
return result;
}
}
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
* <p>
* NOTE: This method is abstract despite the code used by both all implementations being identical.
* This is because the method needed {@code entrySet()} is made available on two different concrete
* interfaces between the embedded and remote cache implementations, and therefore we need to access
* this method from the concrete implementation classes (RemoteCache and Cache) rather than from
* the common class used by CacheProvider (BasicCache).
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected void purgeCache(Scn minCacheScn) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
@Override
protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
@ -378,39 +368,22 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
}
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(T transaction) {
return new Iterator<>() {
private final int count = transaction.getNumberOfEvents();
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected void purgeCache(Scn minCacheScn) {
getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
private LogMinerEvent nextEvent;
private int index = 0;
@Override
public boolean hasNext() {
while (index < count) {
nextEvent = getEventCache().get(transaction.getEventId(index));
if (nextEvent == null) {
LOGGER.debug("Event {} must have been undone, skipped.", index);
// There are situations where an event will be removed from the cache when it is
// undone by the undo-row flag. The event id isn't re-used in this use case so
// the iterator automatically detects null entries and skips them by advancing
// to the next entry until either we've reached the number of events or detected
// a non-null entry available for return
index++;
continue;
}
break;
}
return index < count;
}
@Override
public LogMinerEvent next() {
index++;
return nextEvent;
}
};
private void removeEventsWithTransaction(T transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getEventCache().remove(transaction.getEventId(i));
}
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
private void removeEvents(LogMinerEventRow row, List<String> eventKeys) {
@ -426,4 +399,33 @@ private void removeEvents(LogMinerEventRow row, List<String> eventKeys) {
Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(),
row.getRowId());
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings
* which would allow "100" to come before "9".
*/
private static class EventKeySortComparator implements Comparator<String> {
public static final EventKeySortComparator INSTANCE = new EventKeySortComparator();
@Override
public int compare(String o1, String o2) {
if (o1 == null || !o1.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
if (o2 == null || !o2.contains("-")) {
throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
}
final String[] s1 = o1.split("-");
final String[] s2 = o2.split("-");
// Compare transaction ids, these should generally be identical.
int result = s1[0].compareTo(s2[0]);
if (result == 0) {
result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
}
return result;
}
}
}