DBZ-8035 Add support for number of events seen by table metric

This commit is contained in:
mfvitale 2024-07-30 15:14:28 +02:00 committed by Jiri Pechanec
parent 65436fec23
commit d38c7b4325
7 changed files with 266 additions and 5 deletions

View File

@ -17,6 +17,7 @@
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.TabularDataSupport;
import org.awaitility.Awaitility;
import org.junit.After;
@ -122,7 +123,7 @@ public void testSnapshotAndStreamingMetrics() throws Exception {
.build());
assertSnapshotMetrics();
assertStreamingMetrics();
assertStreamingMetrics(false);
}
@Test
@ -157,7 +158,24 @@ public void testStreamingOnlyMetrics() throws Exception {
.build());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics();
assertStreamingMetrics(false);
}
@Test
public void testAdvancedStreamingMetrics() throws Exception {
// Setup
TestHelper.execute(INIT_STATEMENTS);
// start connector
start(PostgresConnector.class,
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.ADVANCED_METRICS_ENABLE, Boolean.TRUE)
.build());
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics(true);
}
private void assertSnapshotMetrics() throws Exception {
@ -220,7 +238,7 @@ private void assertSnapshotNotExecutedMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(false);
}
private void assertStreamingMetrics() throws Exception {
private void assertStreamingMetrics(boolean checkAdvancedMetrics) throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Wait for the streaming to begin
@ -238,6 +256,19 @@ private void assertStreamingMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
// todo: this does not seem to be populated?
// Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[] {"public.simple"});
if (checkAdvancedMetrics) {
TabularDataSupport numberOfCreateEventsSeen = (TabularDataSupport) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "NumberOfCreateEventsSeen");
String values = numberOfCreateEventsSeen.values().stream()
.limit(1)
.toList()
.get(0)
.toString();
assertThat(values).isEqualTo(
"javax.management.openmbean.CompositeDataSupport(compositeType=javax.management.openmbean.CompositeType(name=java.util.Map<java.lang.String, java.lang.Long>,items=((itemName=key,itemType=javax.management.openmbean.SimpleType(name=java.lang.String)),(itemName=value,itemType=javax.management.openmbean.SimpleType(name=java.lang.Long)))),contents={key=public.simple, value=2})");
}
}
private void assertStreamingWithCustomMetrics(Map<String, String> customMetricTags) throws Exception {

View File

@ -82,6 +82,7 @@ public abstract class CommonConnectorConfig {
protected final boolean snapshotModeConfigurationBasedSnapshotOnSchemaError;
protected final boolean snapshotModeConfigurationBasedSnapshotOnDataError;
protected final boolean isLogPositionCheckEnabled;
protected final boolean isAdvancedMetricsEnabled;
/**
* The set of predefined versions e.g. for source struct maker version
@ -1075,6 +1076,16 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
.optional()
.withDescription("When enabled the connector checks if the position stored in the offset is still available in the log");
public static final Field ADVANCED_METRICS_ENABLE = Field.createInternal("advanced.metrics.enable")
.withDisplayName("Enable/Disable advance metrics")
.withType(Type.BOOLEAN)
.withDefault(false)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 31))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.optional()
.withDescription("When enabled the connector will emit advanced streaming metrics");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
EVENT_PROCESSING_FAILURE_HANDLING_MODE,
@ -1099,7 +1110,8 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
QUERY_FETCH_SIZE,
MAX_RETRIES_ON_ERROR,
INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY,
LOG_POSITION_CHECK_ENABLED)
LOG_POSITION_CHECK_ENABLED,
ADVANCED_METRICS_ENABLE)
.events(
CUSTOM_CONVERTERS,
CUSTOM_POST_PROCESSORS,
@ -1213,6 +1225,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.snapshotModeConfigurationBasedSnapshotOnSchemaError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR);
this.snapshotModeConfigurationBasedSnapshotOnDataError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR);
this.isLogPositionCheckEnabled = config.getBoolean(LOG_POSITION_CHECK_ENABLED);
this.isAdvancedMetricsEnabled = config.getBoolean(ADVANCED_METRICS_ENABLE);
this.signalingDataCollectionId = !Strings.isNullOrBlank(this.signalingDataCollection)
? TableId.parse(this.signalingDataCollection)
@ -1580,6 +1593,10 @@ public boolean isLogPositionCheckEnabled() {
return isLogPositionCheckEnabled;
}
public boolean isAdvancedMetricsEnabled() {
return isAdvancedMetricsEnabled;
}
public EnumeratedValue snapshotQueryMode() {
return this.snapshotQueryMode;
}

View File

@ -20,6 +20,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.transforms.ActivityMonitoringMeter;
/**
* The default implementation of metrics related to the streaming phase of a connector.
@ -32,12 +33,14 @@ public class DefaultStreamingChangeEventSourceMetrics<P extends Partition> exten
private final ConnectionMeter connectionMeter;
private final StreamingMeter streamingMeter;
private final ActivityMonitoringMeter activityMonitoringMeter;
public <T extends CdcSourceTaskContext> DefaultStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
super(taskContext, "streaming", changeEventQueueMetrics, metadataProvider);
streamingMeter = new StreamingMeter(taskContext, metadataProvider);
connectionMeter = new ConnectionMeter();
activityMonitoringMeter = new ActivityMonitoringMeter();
}
public <T extends CdcSourceTaskContext> DefaultStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
@ -45,6 +48,7 @@ public <T extends CdcSourceTaskContext> DefaultStreamingChangeEventSourceMetrics
super(taskContext, changeEventQueueMetrics, metadataProvider, tags);
streamingMeter = new StreamingMeter(taskContext, metadataProvider);
connectionMeter = new ConnectionMeter();
activityMonitoringMeter = new ActivityMonitoringMeter();
}
@Override
@ -80,6 +84,9 @@ public long getNumberOfCommittedTransactions() {
public void onEvent(P partition, DataCollectionId source, OffsetContext offset, Object key, Struct value, Operation operation) {
super.onEvent(partition, source, offset, key, value, operation);
streamingMeter.onEvent(source, offset, key, value);
if (taskContext.getConfig().isAdvancedMetricsEnabled()) {
activityMonitoringMeter.onEvent(source, offset, key, value, operation);
}
}
@Override
@ -96,5 +103,21 @@ public void reset() {
super.reset();
streamingMeter.reset();
connectionMeter.reset();
activityMonitoringMeter.reset();
}
@Override
public Map<String, Long> getNumberOfCreateEventsSeen() {
return activityMonitoringMeter.getNumberOfCreateEventsSeen();
}
@Override
public Map<String, Long> getNumberOfDeleteEventsSeen() {
return activityMonitoringMeter.getNumberOfDeleteEventsSeen();
}
@Override
public Map<String, Long> getNumberOfUpdateEventsSeen() {
return activityMonitoringMeter.getNumberOfUpdateEventsSeen();
}
}

View File

@ -7,6 +7,7 @@
import io.debezium.pipeline.metrics.traits.ConnectionMetricsMXBean;
import io.debezium.pipeline.metrics.traits.StreamingMetricsMXBean;
import io.debezium.transforms.ActivityMonitoringMXBean;
/**
* Metrics specific to streaming change event sources
@ -14,5 +15,5 @@
* @author Randall Hauch, Jiri Pechanec
*/
public interface StreamingChangeEventSourceMetricsMXBean extends ChangeEventSourceMetricsMXBean,
ConnectionMetricsMXBean, StreamingMetricsMXBean {
ConnectionMetricsMXBean, StreamingMetricsMXBean, ActivityMonitoringMXBean {
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import java.util.Map;
public interface ActivityMonitoringMXBean {
Map<String, Long> getNumberOfCreateEventsSeen();
Map<String, Long> getNumberOfDeleteEventsSeen();
Map<String, Long> getNumberOfUpdateEventsSeen();
}

View File

@ -0,0 +1,103 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.data.Envelope;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
public class ActivityMonitoringMeter implements ActivityMonitoringMXBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ActivityMonitoringMeter.class);
private final ActivityCounter createCount = new ActivityCounter();
private final ActivityCounter updateCount = new ActivityCounter();
private final ActivityCounter deleteCount = new ActivityCounter();
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) {
LOGGER.trace("Received record {} with key {}", value, key);
String tableName = source.identifier();
switch (operation) {
case CREATE:
createCount.add(1, tableName);
break;
case UPDATE:
updateCount.add(1, tableName);
break;
case DELETE:
deleteCount.add(1, tableName);
break;
default:
break;
}
LOGGER.trace("Counter status create:{}, delete:{}, update:{}", createCount, deleteCount, updateCount);
}
@Override
public Map<String, Long> getNumberOfCreateEventsSeen() {
return createCount.getCounter();
}
@Override
public Map<String, Long> getNumberOfDeleteEventsSeen() {
return deleteCount.getCounter();
}
@Override
public Map<String, Long> getNumberOfUpdateEventsSeen() {
return updateCount.getCounter();
}
public void reset() {
createCount.reset();
updateCount.reset();
deleteCount.reset();
}
public static class ActivityCounter {
private final ConcurrentMap<String, AtomicLong> counterByCollection = new ConcurrentHashMap<>();
public ActivityCounter() {
}
public void add(int increment, String tableName) {
counterByCollection.compute(tableName, (k, v) -> {
if (v == null) {
return new AtomicLong(increment);
}
v.addAndGet(increment);
return v;
});
}
public Map<String, Long> getCounter() {
return counterByCollection.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
}
public void reset() {
counterByCollection.clear();
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
import org.junit.Test;
import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
public class ActivityMonitoringMeterTest {
ActivityMonitoringMeter activityMonitoringMeter;
@Test
public void whenNoEventIsReceivedAndNewCreatedMeterThenNoMetricsMustBeReturned() {
activityMonitoringMeter = new ActivityMonitoringMeter();
assertThat(activityMonitoringMeter.getNumberOfCreateEventsSeen()).isEmpty();
assertThat(activityMonitoringMeter.getNumberOfUpdateEventsSeen()).isEmpty();
assertThat(activityMonitoringMeter.getNumberOfDeleteEventsSeen()).isEmpty();
}
@Test
public void whenInsertEventIsReceivedThenCorrectMetricsMustBeReturned() {
activityMonitoringMeter = new ActivityMonitoringMeter();
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.UPDATE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.DELETE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.anotherTable"), null, 1L, null, Envelope.Operation.CREATE);
assertThat(activityMonitoringMeter.getNumberOfCreateEventsSeen())
.contains(Map.entry("db.schema.table", 2L), Map.entry("db.schema.anotherTable", 1L));
assertThat(activityMonitoringMeter.getNumberOfUpdateEventsSeen())
.contains(Map.entry("db.schema.table", 1L));
assertThat(activityMonitoringMeter.getNumberOfDeleteEventsSeen())
.contains(Map.entry("db.schema.table", 1L));
}
@Test
public void whenMeterIsResetThenNoMetricsMustBeReturned() {
activityMonitoringMeter = new ActivityMonitoringMeter();
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.CREATE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.UPDATE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.table"), null, 1L, null, Envelope.Operation.DELETE);
activityMonitoringMeter.onEvent(TableId.parse("db.schema.anotherTable"), null, 1L, null, Envelope.Operation.CREATE);
activityMonitoringMeter.reset();
assertThat(activityMonitoringMeter.getNumberOfCreateEventsSeen()).isEmpty();
assertThat(activityMonitoringMeter.getNumberOfUpdateEventsSeen()).isEmpty();
assertThat(activityMonitoringMeter.getNumberOfDeleteEventsSeen()).isEmpty();
}
}