From 15e9231c309b1e53021914714e222542e3c108a1 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Tue, 31 May 2022 16:05:14 -0400 Subject: [PATCH] DBZ-5179 Use `LRUCacheMap` to avoid high memory usage --- ...acleStreamingChangeEventSourceMetrics.java | 19 ++--- .../oracle/OracleStreamingMetricsTest.java | 42 +++++++++++ .../java/io/debezium/util/LRUCacheMap.java | 69 +++++++++++++++++++ .../modules/ROOT/pages/connectors/oracle.adoc | 4 +- 4 files changed, 123 insertions(+), 11 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSourceMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSourceMetrics.java index c09b7dd2d..9dbfbff95 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSourceMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSourceMetrics.java @@ -11,7 +11,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -27,6 +26,7 @@ import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.metrics.DefaultStreamingChangeEventSourceMetrics; import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.util.LRUCacheMap; /** * The metrics implementation for Oracle connector streaming phase. @@ -38,6 +38,7 @@ public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingCha private static final Logger LOGGER = LoggerFactory.getLogger(OracleStreamingChangeEventSourceMetrics.class); private static final long MILLIS_PER_SECOND = 1000L; + private static final int TRANSACTION_ID_SET_SIZE = 10; private final AtomicReference currentScn = new AtomicReference<>(); private final AtomicInteger logMinerQueryCount = new AtomicInteger(); @@ -79,8 +80,8 @@ public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingCha private final AtomicLong activeTransactions = new AtomicLong(); private final AtomicLong rolledBackTransactions = new AtomicLong(); private final AtomicLong committedTransactions = new AtomicLong(); - private final AtomicReference> abandonedTransactionIds = new AtomicReference<>(); - private final AtomicReference> rolledBackTransactionIds = new AtomicReference<>(); + private final AtomicReference> abandonedTransactionIds = new AtomicReference<>(); + private final AtomicReference> rolledBackTransactionIds = new AtomicReference<>(); private final AtomicLong registeredDmlCount = new AtomicLong(); private final AtomicLong committedDmlCount = new AtomicLong(); private final AtomicInteger errorCount = new AtomicInteger(); @@ -198,8 +199,8 @@ public void reset() { committedTransactions.set(0); registeredDmlCount.set(0); committedDmlCount.set(0); - abandonedTransactionIds.set(new HashSet<>()); - rolledBackTransactionIds.set(new HashSet<>()); + abandonedTransactionIds.set(new LRUCacheMap<>(TRANSACTION_ID_SET_SIZE)); + rolledBackTransactionIds.set(new LRUCacheMap<>(TRANSACTION_ID_SET_SIZE)); errorCount.set(0); warningCount.set(0); scnFreezeCount.set(0); @@ -557,12 +558,12 @@ public long getMinLagFromSourceInMilliseconds() { @Override public Set getAbandonedTransactionIds() { - return abandonedTransactionIds.get(); + return abandonedTransactionIds.get().keySet(); } @Override public Set getRolledBackTransactionIds() { - return rolledBackTransactionIds.get(); + return rolledBackTransactionIds.get().keySet(); } @Override @@ -661,13 +662,13 @@ public void incrementScnFreezeCount() { public void addAbandonedTransactionId(String transactionId) { if (transactionId != null) { - abandonedTransactionIds.get().add(transactionId); + abandonedTransactionIds.get().put(transactionId, transactionId); } } public void addRolledBackTransactionId(String transactionId) { if (transactionId != null) { - rolledBackTransactionIds.get().add(transactionId); + rolledBackTransactionIds.get().put(transactionId, transactionId); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleStreamingMetricsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleStreamingMetricsTest.java index fb4d90ffb..688177c61 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleStreamingMetricsTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleStreamingMetricsTest.java @@ -316,6 +316,48 @@ public void testCustomTransactionRetention() throws Exception { assertThat(metrics.getHoursToKeepTransactionInBuffer()).isEqualTo(3); } + @Test + @FixFor("DBZ-5179") + public void testRollbackTransactionIdSetSizeLimit() throws Exception { + init(TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION, 3)); + + // Check state up to maximum size + for (int i = 1; i <= 10; ++i) { + metrics.addRolledBackTransactionId(String.valueOf(i)); + } + assertThat(metrics.getRolledBackTransactionIds()).containsOnly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); + + // Add another rollback transaction, does not exist in set + metrics.addRolledBackTransactionId("11"); + assertThat(metrics.getRolledBackTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11"); + + // Add another rollback transaction, this time the same as before + // Set should be unchanged. + metrics.addRolledBackTransactionId("11"); + assertThat(metrics.getRolledBackTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11"); + } + + @Test + @FixFor("DBZ-5179") + public void testAbandonedTransactionIdSetSizeLimit() throws Exception { + init(TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION, 3)); + + // Check state up to maximum size + for (int i = 1; i <= 10; ++i) { + metrics.addAbandonedTransactionId(String.valueOf(i)); + } + assertThat(metrics.getAbandonedTransactionIds()).containsOnly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); + + // Add another abandoned transaction, does not exist in set + metrics.addAbandonedTransactionId("11"); + assertThat(metrics.getAbandonedTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11"); + + // Add another abandoned transaction, this time the same as before + // Set should be unchanged. + metrics.addAbandonedTransactionId("11"); + assertThat(metrics.getAbandonedTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11"); + } + private void init(Configuration.Builder builder) { this.connectorConfig = new OracleConnectorConfig(builder.build()); diff --git a/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java b/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java new file mode 100644 index 000000000..c7515ab44 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/util/LRUCacheMap.java @@ -0,0 +1,69 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.util; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.cache.Cache; + +import io.debezium.annotation.NotThreadSafe; + +/** + * A custom implementation of {@link org.apache.kafka.common.cache.LRUCache} that allows exposure + * to the underlying delegate's key or values collections. + * + * @author Chris Cranford + */ +@NotThreadSafe +public class LRUCacheMap implements Cache { + + private final LinkedHashMap delegate; + + public LRUCacheMap(int maxSize) { + this.delegate = new LinkedHashMap(16, 0.75F, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return this.size() > maxSize; + } + }; + } + + @Override + public V get(K key) { + return delegate.get(key); + } + + @Override + public void put(K key, V value) { + delegate.put(key, value); + } + + @Override + public boolean remove(K key) { + return delegate.remove(key) != null; + } + + @Override + public long size() { + return delegate.size(); + } + + public Set keySet() { + return delegate.keySet(); + } + + public Collection values() { + return delegate.values(); + } + + @Override + public String toString() { + return delegate.toString(); + } +} diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index 8cc47b243..d6443833f 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -3268,12 +3268,12 @@ See <> |`string[]` -|An array of abandoned transaction identifiers removed from the transaction buffer due to their age. +|An array of the most recent abandoned transaction identifiers removed from the transaction buffer due to their age. See <> for details. |[[oracle-streaming-metrics-rolled-back-transaction-ids]]<> |`string[]` -|An array of transaction identifiers that have been mined and rolled back in the transaction buffer. +|An array of the most recent transaction identifiers that have been mined and rolled back in the transaction buffer. |[[oracle-streaming-metrics-last-commit-duration-in-milliseconds]]<> |`long`