From d38c7b4325c89d8afab5d8196b146eafa6e177d8 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Tue, 30 Jul 2024 15:14:28 +0200 Subject: [PATCH] DBZ-8035 Add support for number of events seen by table metric --- .../postgresql/PostgresMetricsIT.java | 37 ++++++- .../config/CommonConnectorConfig.java | 19 +++- ...aultStreamingChangeEventSourceMetrics.java | 23 ++++ ...reamingChangeEventSourceMetricsMXBean.java | 3 +- .../transforms/ActivityMonitoringMXBean.java | 17 +++ .../transforms/ActivityMonitoringMeter.java | 103 ++++++++++++++++++ .../ActivityMonitoringMeterTest.java | 69 ++++++++++++ 7 files changed, 266 insertions(+), 5 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java create mode 100644 debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java create mode 100644 debezium-core/src/test/java/io/debezium/transforms/ActivityMonitoringMeterTest.java diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java index 72e2f92d8..035c3ab15 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java @@ -17,6 +17,7 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.openmbean.TabularDataSupport; import org.awaitility.Awaitility; import org.junit.After; @@ -122,7 +123,7 @@ public void testSnapshotAndStreamingMetrics() throws Exception { .build()); assertSnapshotMetrics(); - assertStreamingMetrics(); + assertStreamingMetrics(false); } @Test @@ -157,7 +158,24 @@ public void testStreamingOnlyMetrics() throws Exception { .build()); assertSnapshotNotExecutedMetrics(); - assertStreamingMetrics(); + assertStreamingMetrics(false); + } + + @Test + public void testAdvancedStreamingMetrics() throws Exception { + // Setup + TestHelper.execute(INIT_STATEMENTS); + + // start connector + start(PostgresConnector.class, + TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE) + .build()); + + assertSnapshotNotExecutedMetrics(); + assertStreamingMetrics(true); } private void assertSnapshotMetrics() throws Exception { @@ -220,7 +238,7 @@ private void assertSnapshotNotExecutedMetrics() throws Exception { assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(false); } - private void assertStreamingMetrics() throws Exception { + private void assertStreamingMetrics(boolean checkAdvancedMetrics) throws Exception { final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); // Wait for the streaming to begin @@ -238,6 +256,19 @@ private void assertStreamingMetrics() throws Exception { assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L); // todo: this does not seem to be populated? // Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[] {"public.simple"}); + + if (checkAdvancedMetrics) { + TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen"); + + String values = numberOfCreateEventsSeen.values().stream() + .limit(1) + .toList() + .get(0) + .toString(); + assertThat(values).isEqualTo( + "javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value=2})"); + } + } private void assertStreamingWithCustomMetrics(Map customMetricTags) throws Exception { diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index b8bea5c51..c580111dd 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -82,6 +82,7 @@ public abstract class CommonConnectorConfig { protected final boolean snapshotModeConfigurationBasedSnapshotOnSchemaError; protected final boolean snapshotModeConfigurationBasedSnapshotOnDataError; protected final boolean isLogPositionCheckEnabled; + protected final boolean isAdvancedMetricsEnabled; /** * The set of predefined versions e.g. for source struct maker version @@ -1075,6 +1076,16 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { .optional() .withDescription("When enabled the connector checks if the position stored in the offset is still available in the log"); + public static final Field ADVANCED_METRICS_ENABLE = Field.createInternal("advanced.metrics.enable") + .withDisplayName("Enable/Disable advance metrics") + .withType(Type.BOOLEAN) + .withDefault(false) + .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 31)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .optional() + .withDescription("When enabled the connector will emit advanced streaming metrics"); + protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor() .connector( EVENT_PROCESSING_FAILURE_HANDLING_MODE, @@ -1099,7 +1110,8 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { QUERY_FETCH_SIZE, MAX_RETRIES_ON_ERROR, INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY, - LOG_POSITION_CHECK_ENABLED) + LOG_POSITION_CHECK_ENABLED, + ADVANCED_METRICS_ENABLE) .events( CUSTOM_CONVERTERS, CUSTOM_POST_PROCESSORS, @@ -1213,6 +1225,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi this.snapshotModeConfigurationBasedSnapshotOnSchemaError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR); this.snapshotModeConfigurationBasedSnapshotOnDataError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR); this.isLogPositionCheckEnabled = config.getBoolean(LOG_POSITION_CHECK_ENABLED); + this.isAdvancedMetricsEnabled = config.getBoolean(ADVANCED_METRICS_ENABLE); this.signalingDataCollectionId = !Strings.isNullOrBlank(this.signalingDataCollection) ? TableId.parse(this.signalingDataCollection) @@ -1580,6 +1593,10 @@ public boolean isLogPositionCheckEnabled() { return isLogPositionCheckEnabled; } + public boolean isAdvancedMetricsEnabled() { + return isAdvancedMetricsEnabled; + } + public EnumeratedValue snapshotQueryMode() { return this.snapshotQueryMode; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java index 49726e2f4..a1cdb3212 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultStreamingChangeEventSourceMetrics.java @@ -20,6 +20,7 @@ import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; import io.debezium.spi.schema.DataCollectionId; +import io.debezium.transforms.ActivityMonitoringMeter; /** * The default implementation of metrics related to the streaming phase of a connector. @@ -32,12 +33,14 @@ public class DefaultStreamingChangeEventSourceMetrics

exten private final ConnectionMeter connectionMeter; private final StreamingMeter streamingMeter; + private final ActivityMonitoringMeter activityMonitoringMeter; public DefaultStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { super(taskContext, "streaming", changeEventQueueMetrics, metadataProvider); streamingMeter = new StreamingMeter(taskContext, metadataProvider); connectionMeter = new ConnectionMeter(); + activityMonitoringMeter = new ActivityMonitoringMeter(); } public DefaultStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, @@ -45,6 +48,7 @@ public DefaultStreamingChangeEventSourceMetrics super(taskContext, changeEventQueueMetrics, metadataProvider, tags); streamingMeter = new StreamingMeter(taskContext, metadataProvider); connectionMeter = new ConnectionMeter(); + activityMonitoringMeter = new ActivityMonitoringMeter(); } @Override @@ -80,6 +84,9 @@ public long getNumberOfCommittedTransactions() { public void onEvent(P partition, DataCollectionId source, OffsetContext offset, Object key, Struct value, Operation operation) { super.onEvent(partition, source, offset, key, value, operation); streamingMeter.onEvent(source, offset, key, value); + if (taskContext.getConfig().isAdvancedMetricsEnabled()) { + activityMonitoringMeter.onEvent(source, offset, key, value, operation); + } } @Override @@ -96,5 +103,21 @@ public void reset() { super.reset(); streamingMeter.reset(); connectionMeter.reset(); + activityMonitoringMeter.reset(); + } + + @Override + public Map getNumberOfCreateEventsSeen() { + return activityMonitoringMeter.getNumberOfCreateEventsSeen(); + } + + @Override + public Map getNumberOfDeleteEventsSeen() { + return activityMonitoringMeter.getNumberOfDeleteEventsSeen(); + } + + @Override + public Map getNumberOfUpdateEventsSeen() { + return activityMonitoringMeter.getNumberOfUpdateEventsSeen(); } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetricsMXBean.java index 8f829aa40..bf9a9fb95 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetricsMXBean.java @@ -7,6 +7,7 @@ import io.debezium.pipeline.metrics.traits.ConnectionMetricsMXBean; import io.debezium.pipeline.metrics.traits.StreamingMetricsMXBean; +import io.debezium.transforms.ActivityMonitoringMXBean; /** * Metrics specific to streaming change event sources @@ -14,5 +15,5 @@ * @author Randall Hauch, Jiri Pechanec */ public interface StreamingChangeEventSourceMetricsMXBean extends ChangeEventSourceMetricsMXBean, - ConnectionMetricsMXBean, StreamingMetricsMXBean { + ConnectionMetricsMXBean, StreamingMetricsMXBean, ActivityMonitoringMXBean { } diff --git a/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java new file mode 100644 index 000000000..cd6020a39 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMXBean.java @@ -0,0 +1,17 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import java.util.Map; + +public interface ActivityMonitoringMXBean { + + Map getNumberOfCreateEventsSeen(); + + Map getNumberOfDeleteEventsSeen(); + + Map getNumberOfUpdateEventsSeen(); +} diff --git a/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java new file mode 100644 index 000000000..15fca3ede --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/transforms/ActivityMonitoringMeter.java @@ -0,0 +1,103 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.data.Envelope; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.spi.schema.DataCollectionId; + +public class ActivityMonitoringMeter implements ActivityMonitoringMXBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(ActivityMonitoringMeter.class); + + private final ActivityCounter createCount = new ActivityCounter(); + private final ActivityCounter updateCount = new ActivityCounter(); + private final ActivityCounter deleteCount = new ActivityCounter(); + + public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) { + + LOGGER.trace("Received record {} with key {}", value, key); + String tableName = source.identifier(); + switch (operation) { + case CREATE: + createCount.add(1, tableName); + break; + case UPDATE: + updateCount.add(1, tableName); + break; + case DELETE: + deleteCount.add(1, tableName); + break; + default: + break; + } + + LOGGER.trace("Counter status create:{}, delete:{}, update:{}", createCount, deleteCount, updateCount); + } + + @Override + public Map getNumberOfCreateEventsSeen() { + return createCount.getCounter(); + } + + @Override + public Map getNumberOfDeleteEventsSeen() { + return deleteCount.getCounter(); + } + + @Override + public Map getNumberOfUpdateEventsSeen() { + return updateCount.getCounter(); + } + + public void reset() { + createCount.reset(); + updateCount.reset(); + deleteCount.reset(); + } + + public static class ActivityCounter { + + private final ConcurrentMap counterByCollection = new ConcurrentHashMap<>(); + + public ActivityCounter() { + } + + public void add(int increment, String tableName) { + + counterByCollection.compute(tableName, (k, v) -> { + + if (v == null) { + return new AtomicLong(increment); + } + + v.addAndGet(increment); + + return v; + }); + + } + + public Map getCounter() { + return counterByCollection.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); + } + + public void reset() { + counterByCollection.clear(); + } + } +} diff --git a/debezium-core/src/test/java/io/debezium/transforms/ActivityMonitoringMeterTest.java b/debezium-core/src/test/java/io/debezium/transforms/ActivityMonitoringMeterTest.java new file mode 100644 index 000000000..024f229a6 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/transforms/ActivityMonitoringMeterTest.java @@ -0,0 +1,69 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; + +import org.junit.Test; + +import io.debezium.data.Envelope; +import io.debezium.relational.TableId; + +public class ActivityMonitoringMeterTest { + + ActivityMonitoringMeter activityMonitoringMeter; + + @Test + public void whenNoEventIsReceivedAndNewCreatedMeterThenNoMetricsMustBeReturned() { + + activityMonitoringMeter = new ActivityMonitoringMeter(); + + assertThat(activityMonitoringMeter.getNumberOfCreateEventsSeen()).isEmpty(); + assertThat(activityMonitoringMeter.getNumberOfUpdateEventsSeen()).isEmpty(); + assertThat(activityMonitoringMeter.getNumberOfDeleteEventsSeen()).isEmpty(); + + } + + @Test + public void whenInsertEventIsReceivedThenCorrectMetricsMustBeReturned() { + + activityMonitoringMeter = new ActivityMonitoringMeter(); + + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.UPDATE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.DELETE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.anotherTable"), null, 1L, null, Envelope.Operation.CREATE); + + assertThat(activityMonitoringMeter.getNumberOfCreateEventsSeen()) + .contains(Map.entry("db.schema.table", 2L), Map.entry("db.schema.anotherTable", 1L)); + assertThat(activityMonitoringMeter.getNumberOfUpdateEventsSeen()) + .contains(Map.entry("db.schema.table", 1L)); + assertThat(activityMonitoringMeter.getNumberOfDeleteEventsSeen()) + .contains(Map.entry("db.schema.table", 1L)); + + } + + @Test + public void whenMeterIsResetThenNoMetricsMustBeReturned() { + + activityMonitoringMeter = new ActivityMonitoringMeter(); + + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.UPDATE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.DELETE); + activityMonitoringMeter.onEvent(TableId.parse("db.schema.anotherTable"), null, 1L, null, Envelope.Operation.CREATE); + + activityMonitoringMeter.reset(); + + assertThat(activityMonitoringMeter.getNumberOfCreateEventsSeen()).isEmpty(); + assertThat(activityMonitoringMeter.getNumberOfUpdateEventsSeen()).isEmpty(); + assertThat(activityMonitoringMeter.getNumberOfDeleteEventsSeen()).isEmpty(); + } +}