From f71ddd92190c00aba585db3520274e12f9384fb0 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Tue, 30 Jul 2024 16:26:27 +0200 Subject: [PATCH] DBZ-8035 Add pause/resume operation for ActivityMonitoringMeter JMXBean --- .../postgresql/PostgresMetricsIT.java | 68 ++++++++++++++++--- ...aultStreamingChangeEventSourceMetrics.java | 10 +++ .../transforms/ActivityMonitoringMXBean.java | 7 ++ .../transforms/ActivityMonitoringMeter.java | 17 +++++ 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java index 035c3ab15..61695f68b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java @@ -14,9 +14,11 @@ import java.util.concurrent.TimeUnit; import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.ReflectionException; import javax.management.openmbean.TabularDataSupport; import org.awaitility.Awaitility; @@ -178,6 +180,39 @@ public void testAdvancedStreamingMetrics() throws Exception { assertStreamingMetrics(true); } + @Test + public void testPauseAndResumeAdvancedStreamingMetrics() throws Exception { + // Setup + TestHelper.execute(INIT_STATEMENTS); + + // start connector + start(PostgresConnector.class, + TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE) + .build()); + + assertSnapshotNotExecutedMetrics(); + assertStreamingMetrics(true); + + invokeOperation(getStreamingMetricsObjectName(), "pause"); + insertRecords(); + assertAdvancedMetrics(2); + + invokeOperation(getStreamingMetricsObjectName(), "resume"); + insertRecords(); + assertAdvancedMetrics(4); + } + + private void insertRecords() throws InterruptedException { + // Wait for the streaming to begin + TestConsumer consumer = testConsumer(2, "public"); + TestHelper.execute(INSERT_STATEMENTS); + consumer.await(TestHelper.waitTimeForRecords() * 30L, TimeUnit.SECONDS); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + } + private void assertSnapshotMetrics() throws Exception { final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -258,17 +293,32 @@ private void assertStreamingMetrics(boolean checkAdvancedMetrics) throws Excepti // Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[] {"public.simple"}); if (checkAdvancedMetrics) { - TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen"); - - String values = numberOfCreateEventsSeen.values().stream() - .limit(1) - .toList() - .get(0) - .toString(); - assertThat(values).isEqualTo( - "javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value=2})"); + assertAdvancedMetrics(2); } + } + public void assertAdvancedMetrics(int expectedInsert) throws Exception { + + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen"); + + String values = numberOfCreateEventsSeen.values().stream() + .limit(1) + .toList() + .get(0) + .toString(); + assertThat(values).isEqualTo( + "javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value=" + + expectedInsert + "})"); + } + + private void invokeOperation(ObjectName objectName, String operation) + throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException { + + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + + server.invoke(objectName, operation, new Object[]{}, new String[]{}); } private void assertStreamingWithCustomMetrics(Map customMetricTags) throws Exception { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java index a1cdb3212..81ff71661 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java @@ -106,6 +106,16 @@ public void reset() { activityMonitoringMeter.reset(); } + @Override + public void pause() { + activityMonitoringMeter.pause(); + } + + @Override + public void resume() { + activityMonitoringMeter.resume(); + } + @Override public Map getNumberOfCreateEventsSeen() { return activityMonitoringMeter.getNumberOfCreateEventsSeen(); diff --git a/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java index cd6020a39..7f875ae20 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java @@ -7,6 +7,9 @@ import java.util.Map; +/** + * Exposes advanced metrics used for monitoring DB activity. + */ public interface ActivityMonitoringMXBean { Map getNumberOfCreateEventsSeen(); @@ -14,4 +17,8 @@ public interface ActivityMonitoringMXBean { Map getNumberOfDeleteEventsSeen(); Map getNumberOfUpdateEventsSeen(); + + void pause(); + + void resume(); } diff --git a/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java index 15fca3ede..79aab45a7 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java @@ -27,8 +27,15 @@ public class ActivityMonitoringMeter implements ActivityMonitoringMXBean { private final ActivityCounter updateCount = new ActivityCounter(); private final ActivityCounter deleteCount = new ActivityCounter(); + private boolean isPaused = false; + public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) { + if (isPaused) { + LOGGER.trace("ActivityMonitoringMeter is paused, no metric will be collected."); + return; + } + LOGGER.trace("Received record {} with key {}", value, key); String tableName = source.identifier(); switch (operation) { @@ -63,6 +70,16 @@ public Map getNumberOfUpdateEventsSeen() { return updateCount.getCounter(); } + @Override + public void pause() { + isPaused = true; + } + + @Override + public void resume() { + isPaused = false; + } + public void reset() { createCount.reset(); updateCount.reset();