DBZ-5179 Use LRUCacheMap to avoid high memory usage

This commit is contained in:
Chris Cranford 2022-05-31 16:05:14 -04:00 committed by Jiri Pechanec
parent 21e093250e
commit 15e9231c30
4 changed files with 123 additions and 11 deletions

View File

@ -11,7 +11,6 @@
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -27,6 +26,7 @@
import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.DefaultStreamingChangeEventSourceMetrics; import io.debezium.pipeline.metrics.DefaultStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.util.LRUCacheMap;
/** /**
* The metrics implementation for Oracle connector streaming phase. * The metrics implementation for Oracle connector streaming phase.
@ -38,6 +38,7 @@ public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingCha
private static final Logger LOGGER = LoggerFactory.getLogger(OracleStreamingChangeEventSourceMetrics.class); private static final Logger LOGGER = LoggerFactory.getLogger(OracleStreamingChangeEventSourceMetrics.class);
private static final long MILLIS_PER_SECOND = 1000L; private static final long MILLIS_PER_SECOND = 1000L;
private static final int TRANSACTION_ID_SET_SIZE = 10;
private final AtomicReference<Scn> currentScn = new AtomicReference<>(); private final AtomicReference<Scn> currentScn = new AtomicReference<>();
private final AtomicInteger logMinerQueryCount = new AtomicInteger(); private final AtomicInteger logMinerQueryCount = new AtomicInteger();
@ -79,8 +80,8 @@ public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingCha
private final AtomicLong activeTransactions = new AtomicLong(); private final AtomicLong activeTransactions = new AtomicLong();
private final AtomicLong rolledBackTransactions = new AtomicLong(); private final AtomicLong rolledBackTransactions = new AtomicLong();
private final AtomicLong committedTransactions = new AtomicLong(); private final AtomicLong committedTransactions = new AtomicLong();
private final AtomicReference<Set<String>> abandonedTransactionIds = new AtomicReference<>(); private final AtomicReference<LRUCacheMap<String, String>> abandonedTransactionIds = new AtomicReference<>();
private final AtomicReference<Set<String>> rolledBackTransactionIds = new AtomicReference<>(); private final AtomicReference<LRUCacheMap<String, String>> rolledBackTransactionIds = new AtomicReference<>();
private final AtomicLong registeredDmlCount = new AtomicLong(); private final AtomicLong registeredDmlCount = new AtomicLong();
private final AtomicLong committedDmlCount = new AtomicLong(); private final AtomicLong committedDmlCount = new AtomicLong();
private final AtomicInteger errorCount = new AtomicInteger(); private final AtomicInteger errorCount = new AtomicInteger();
@ -198,8 +199,8 @@ public void reset() {
committedTransactions.set(0); committedTransactions.set(0);
registeredDmlCount.set(0); registeredDmlCount.set(0);
committedDmlCount.set(0); committedDmlCount.set(0);
abandonedTransactionIds.set(new HashSet<>()); abandonedTransactionIds.set(new LRUCacheMap<>(TRANSACTION_ID_SET_SIZE));
rolledBackTransactionIds.set(new HashSet<>()); rolledBackTransactionIds.set(new LRUCacheMap<>(TRANSACTION_ID_SET_SIZE));
errorCount.set(0); errorCount.set(0);
warningCount.set(0); warningCount.set(0);
scnFreezeCount.set(0); scnFreezeCount.set(0);
@ -557,12 +558,12 @@ public long getMinLagFromSourceInMilliseconds() {
@Override @Override
public Set<String> getAbandonedTransactionIds() { public Set<String> getAbandonedTransactionIds() {
return abandonedTransactionIds.get(); return abandonedTransactionIds.get().keySet();
} }
@Override @Override
public Set<String> getRolledBackTransactionIds() { public Set<String> getRolledBackTransactionIds() {
return rolledBackTransactionIds.get(); return rolledBackTransactionIds.get().keySet();
} }
@Override @Override
@ -661,13 +662,13 @@ public void incrementScnFreezeCount() {
public void addAbandonedTransactionId(String transactionId) { public void addAbandonedTransactionId(String transactionId) {
if (transactionId != null) { if (transactionId != null) {
abandonedTransactionIds.get().add(transactionId); abandonedTransactionIds.get().put(transactionId, transactionId);
} }
} }
public void addRolledBackTransactionId(String transactionId) { public void addRolledBackTransactionId(String transactionId) {
if (transactionId != null) { if (transactionId != null) {
rolledBackTransactionIds.get().add(transactionId); rolledBackTransactionIds.get().put(transactionId, transactionId);
} }
} }

View File

@ -316,6 +316,48 @@ public void testCustomTransactionRetention() throws Exception {
assertThat(metrics.getHoursToKeepTransactionInBuffer()).isEqualTo(3); assertThat(metrics.getHoursToKeepTransactionInBuffer()).isEqualTo(3);
} }
@Test
@FixFor("DBZ-5179")
public void testRollbackTransactionIdSetSizeLimit() throws Exception {
init(TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION, 3));
// Check state up to maximum size
for (int i = 1; i <= 10; ++i) {
metrics.addRolledBackTransactionId(String.valueOf(i));
}
assertThat(metrics.getRolledBackTransactionIds()).containsOnly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
// Add another rollback transaction, does not exist in set
metrics.addRolledBackTransactionId("11");
assertThat(metrics.getRolledBackTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11");
// Add another rollback transaction, this time the same as before
// Set should be unchanged.
metrics.addRolledBackTransactionId("11");
assertThat(metrics.getRolledBackTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11");
}
@Test
@FixFor("DBZ-5179")
public void testAbandonedTransactionIdSetSizeLimit() throws Exception {
init(TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION, 3));
// Check state up to maximum size
for (int i = 1; i <= 10; ++i) {
metrics.addAbandonedTransactionId(String.valueOf(i));
}
assertThat(metrics.getAbandonedTransactionIds()).containsOnly("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
// Add another abandoned transaction, does not exist in set
metrics.addAbandonedTransactionId("11");
assertThat(metrics.getAbandonedTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11");
// Add another abandoned transaction, this time the same as before
// Set should be unchanged.
metrics.addAbandonedTransactionId("11");
assertThat(metrics.getAbandonedTransactionIds()).containsOnly("2", "3", "4", "5", "6", "7", "8", "9", "10", "11");
}
private void init(Configuration.Builder builder) { private void init(Configuration.Builder builder) {
this.connectorConfig = new OracleConnectorConfig(builder.build()); this.connectorConfig = new OracleConnectorConfig(builder.build());

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.util;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.cache.Cache;
import io.debezium.annotation.NotThreadSafe;
/**
* A custom implementation of {@link org.apache.kafka.common.cache.LRUCache} that allows exposure
* to the underlying delegate's key or values collections.
*
* @author Chris Cranford
*/
@NotThreadSafe
public class LRUCacheMap<K, V> implements Cache<K, V> {
private final LinkedHashMap<K, V> delegate;
public LRUCacheMap(int maxSize) {
this.delegate = new LinkedHashMap<K, V>(16, 0.75F, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return this.size() > maxSize;
}
};
}
@Override
public V get(K key) {
return delegate.get(key);
}
@Override
public void put(K key, V value) {
delegate.put(key, value);
}
@Override
public boolean remove(K key) {
return delegate.remove(key) != null;
}
@Override
public long size() {
return delegate.size();
}
public Set<K> keySet() {
return delegate.keySet();
}
public Collection<V> values() {
return delegate.values();
}
@Override
public String toString() {
return delegate.toString();
}
}

View File

@ -3268,12 +3268,12 @@ See <<oracle-property-log-mining-transaction-retention-hours, `log.mining.transa
|[[oracle-streaming-metrics-abandoned-transaction-ids]]<<oracle-streaming-metrics-abandoned-transaction-ids, `+AbandonedTransactionIds+`>> |[[oracle-streaming-metrics-abandoned-transaction-ids]]<<oracle-streaming-metrics-abandoned-transaction-ids, `+AbandonedTransactionIds+`>>
|`string[]` |`string[]`
|An array of abandoned transaction identifiers removed from the transaction buffer due to their age. |An array of the most recent abandoned transaction identifiers removed from the transaction buffer due to their age.
See <<oracle-property-log-mining-transaction-retention-hours, `log.mining.transaction.retention.hours`>> for details. See <<oracle-property-log-mining-transaction-retention-hours, `log.mining.transaction.retention.hours`>> for details.
|[[oracle-streaming-metrics-rolled-back-transaction-ids]]<<oracle-streaming-metrics-rolled-back-transaction-ids, `+RolledBackTransactionIds+`>> |[[oracle-streaming-metrics-rolled-back-transaction-ids]]<<oracle-streaming-metrics-rolled-back-transaction-ids, `+RolledBackTransactionIds+`>>
|`string[]` |`string[]`
|An array of transaction identifiers that have been mined and rolled back in the transaction buffer. |An array of the most recent transaction identifiers that have been mined and rolled back in the transaction buffer.
|[[oracle-streaming-metrics-last-commit-duration-in-milliseconds]]<<oracle-streaming-metrics-last-commit-duration-in-milliseconds, `+LastCommitDurationInMilliseconds+`>> |[[oracle-streaming-metrics-last-commit-duration-in-milliseconds]]<<oracle-streaming-metrics-last-commit-duration-in-milliseconds, `+LastCommitDurationInMilliseconds+`>>
|`long` |`long`