DBZ-6603 Support for custom tags in the connector metrics

This commit is contained in:
harveyyue 2023-07-02 13:06:23 +08:00 committed by Jiri Pechanec
parent e52e0f3182
commit 5bc7fd1f7c
11 changed files with 105 additions and 16 deletions

View File

@ -30,7 +30,11 @@ public class MongoDbTaskContext extends CdcSourceTaskContext {
* @param config the configuration
*/
public MongoDbTaskContext(Configuration config) {
super(Module.contextName(), config.getString(CommonConnectorConfig.TOPIC_PREFIX), config.getString(MongoDbConnectorConfig.TASK_ID), Collections::emptySet);
super(Module.contextName(),
config.getString(CommonConnectorConfig.TOPIC_PREFIX),
config.getString(MongoDbConnectorConfig.TASK_ID),
new MongoDbConnectorConfig(config).getCustomMetricTags(),
Collections::emptySet);
this.filters = new Filters(config);
this.connectorConfig = new MongoDbConnectorConfig(config);

View File

@ -24,7 +24,7 @@
public class MongoDbSnapshotChangeEventSourceMetrics extends DefaultSnapshotChangeEventSourceMetrics<MongoDbPartition>
implements MongoDbSnapshotChangeEventSourceMetricsMBean {
private AtomicLong numberOfDisconnects = new AtomicLong();
private final AtomicLong numberOfDisconnects = new AtomicLong();
public <T extends CdcSourceTaskContext> MongoDbSnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {

View File

@ -21,7 +21,7 @@ public class MySqlTaskContext extends CdcSourceTaskContext {
private final BinaryLogClient binaryLogClient;
public MySqlTaskContext(MySqlConnectorConfig config, MySqlDatabaseSchema schema) {
super(config.getContextName(), config.getLogicalName(), schema::tableIds);
super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), schema::tableIds);
this.schema = schema;
this.binaryLogClient = new BinaryLogClient(config.hostname(), config.port(), config.username(), config.password());
}

View File

@ -10,6 +10,6 @@
public class OracleTaskContext extends CdcSourceTaskContext {
public OracleTaskContext(OracleConnectorConfig config, OracleDatabaseSchema schema) {
super(config.getContextName(), config.getLogicalName(), schema::tableIds);
super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), schema::tableIds);
}
}

View File

@ -41,7 +41,7 @@ public class PostgresTaskContext extends CdcSourceTaskContext {
private Long lastXmin;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy) {
super(config.getContextName(), config.getLogicalName(), Collections::emptySet);
super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), Collections::emptySet);
this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {

View File

@ -16,6 +16,6 @@
public class SqlServerTaskContext extends CdcSourceTaskContext {
public SqlServerTaskContext(SqlServerConnectorConfig config, SqlServerDatabaseSchema schema) {
super(config.getContextName(), config.getLogicalName(), config.getTaskId(), schema::tableIds);
super(config.getContextName(), config.getLogicalName(), config.getTaskId(), config.getCustomMetricTags(), schema::tableIds);
}
}

View File

@ -10,7 +10,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@ -697,7 +700,7 @@ public static FieldNameAdjustmentMode parse(String value) {
public static final Field MAX_RETRIES_ON_ERROR = Field.create(ERRORS_MAX_RETRIES)
.withDisplayName("The maximum number of retries")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 22))
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 24))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(DEFAULT_MAX_RETRIES)
@ -705,6 +708,17 @@ public static FieldNameAdjustmentMode parse(String value) {
.withDescription(
"The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
public static final Field CUSTOM_METRIC_TAGS = Field.create("custom.metric.tags")
.withDisplayName("Customize metric tags")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 25))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withValidation(Field::isListOfMap)
.withDescription("The custom metric tags will accept key-value pairs to customize the MBean object name, " +
"each key would represent a tag for the MBean object name, " +
"and the corresponding value would be the value of that tag the key is. For example: k1=v1,k2=v2");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
EVENT_PROCESSING_FAILURE_HANDLING_MODE,
@ -731,7 +745,8 @@ public static FieldNameAdjustmentMode parse(String value) {
SIGNAL_ENABLED_CHANNELS,
TOPIC_NAMING_STRATEGY,
NOTIFICATION_ENABLED_CHANNELS,
SinkNotificationChannel.NOTIFICATION_TOPIC)
SinkNotificationChannel.NOTIFICATION_TOPIC,
CUSTOM_METRIC_TAGS)
.create();
private final Configuration config;
@ -768,6 +783,7 @@ public static FieldNameAdjustmentMode parse(String value) {
private final String notificationTopicName;
private final List<String> enabledNotificationChannels;
private final Map<String, String> customMetricTags;
protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSize) {
this.config = config;
@ -802,6 +818,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.enabledNotificationChannels = config.getList(NOTIFICATION_ENABLED_CHANNELS);
this.skipMessagesWithoutChange = config.getBoolean(SKIP_MESSAGES_WITHOUT_CHANGE);
this.maxRetriesOnError = config.getInteger(MAX_RETRIES_ON_ERROR);
this.customMetricTags = createCustomMetricTags(config);
}
private static List<String> getSignalEnabledChannels(Configuration config) {
@ -987,6 +1004,31 @@ public Set<Pattern> getDataCollectionsToBeSnapshotted() {
.orElseGet(Collections::emptySet);
}
public Map<String, String> getCustomMetricTags() {
return customMetricTags;
}
public Map<String, String> createCustomMetricTags(Configuration config) {
// Keep the map custom metric tags sequence
HashMap<String, String> result = new LinkedHashMap<>();
String rawValue = config.getString(CUSTOM_METRIC_TAGS);
if (Strings.isNullOrBlank(rawValue)) {
return result;
}
List<String> values = Strings.listOf(rawValue, x -> x.split(","), String::trim);
for (String v : values) {
List<String> items = Strings.listOf(v, x -> x.split("="), String::trim);
result.put(items.get(0), items.get(1));
}
if (result.size() < values.size()) {
LOGGER.warn("There are duplicated key-value pairs: {}", rawValue);
}
return result;
}
/**
* @return true if the connector should emit messages about schema changes into a public facing
* topic.

View File

@ -1171,6 +1171,23 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut
return errors;
}
public static int isListOfMap(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field);
int errors = 0;
if (!Strings.isNullOrBlank(value)) {
List<String> values = Strings.listOf(value, x -> x.split(","), String::trim);
for (String v : values) {
List<String> items = Strings.listOf(v, x -> x.split("="), String::trim);
if (items.size() != 2) {
problems.accept(field, value, "A equivalent-separated map of valid key/value pairs is expected, for example: k1=v1,k2=v2");
return ++errors;
}
}
}
return errors;
}
public static int isRegex(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field);
int errors = 0;

View File

@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.kafka.connect.source.SourceTask;
@ -27,6 +28,7 @@ public class CdcSourceTaskContext {
private final String connectorType;
private final String connectorName;
private final String taskId;
private final Map<String, String> customMetricTags;
private final Clock clock;
/**
@ -34,17 +36,25 @@ public class CdcSourceTaskContext {
*/
private final Supplier<Collection<? extends DataCollectionId>> collectionsSupplier;
public CdcSourceTaskContext(String connectorType, String connectorName, String taskId, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
public CdcSourceTaskContext(String connectorType,
String connectorName,
String taskId,
Map<String, String> customMetricTags,
Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this.connectorType = connectorType;
this.connectorName = connectorName;
this.taskId = taskId;
this.customMetricTags = customMetricTags;
this.collectionsSupplier = collectionsSupplier != null ? collectionsSupplier : Collections::emptyList;
this.clock = Clock.system();
}
public CdcSourceTaskContext(String connectorType, String connectorName, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this(connectorType, connectorName, "0", collectionsSupplier);
public CdcSourceTaskContext(String connectorType,
String connectorName,
Map<String, String> customMetricTags,
Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this(connectorType, connectorName, "0", customMetricTags, collectionsSupplier);
}
/**
@ -100,4 +110,8 @@ public String getConnectorName() {
public String getTaskId() {
return taskId;
}
public Map<String, String> getCustomMetricTags() {
return customMetricTags;
}
}

View File

@ -32,10 +32,11 @@ public abstract class Metrics {
private volatile boolean registered = false;
protected Metrics(CdcSourceTaskContext taskContext, String contextName) {
this.name = metricName(taskContext.getConnectorType(), taskContext.getConnectorName(), contextName);
this.name = metricName(taskContext.getConnectorType(), taskContext.getConnectorName(), contextName, taskContext.getCustomMetricTags());
}
protected Metrics(CdcSourceTaskContext taskContext, Map<String, String> tags) {
tags.putAll(taskContext.getCustomMetricTags());
this.name = metricName(taskContext.getConnectorType(), tags);
}
@ -49,7 +50,7 @@ protected Metrics(CommonConnectorConfig connectorConfig, String contextName, boo
"context", contextName));
}
else {
this.name = metricName(connectorType, connectorName, contextName);
this.name = metricName(connectorType, connectorName, contextName, connectorConfig.getCustomMetricTags());
}
}
@ -76,8 +77,10 @@ public synchronized void unregister() {
}
}
protected ObjectName metricName(String connectorType, String connectorName, String contextName) {
return metricName(connectorType, Collect.linkMapOf("context", contextName, "server", connectorName));
protected ObjectName metricName(String connectorType, String connectorName, String contextName, Map<String, String> customTags) {
Map<String, String> tags = Collect.linkMapOf("context", contextName, "server", connectorName);
tags.putAll(customTags);
return metricName(connectorType, tags);
}
/**

View File

@ -7,6 +7,7 @@
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.stream.Collectors;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
@ -15,6 +16,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.common.utils.Sanitizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -113,6 +115,13 @@ public static void unregisterMXBean(CommonConnectorConfig connectorConfig, Strin
}
private static String getManagementJmxObjectName(String type, String context, CommonConnectorConfig connectorConfig) {
return String.format(JMX_OBJECT_NAME_FORMAT, connectorConfig.getContextName().toLowerCase(), type, context, connectorConfig.getLogicalName());
String tags = String.format(JMX_OBJECT_NAME_FORMAT, connectorConfig.getContextName().toLowerCase(), type, context, connectorConfig.getLogicalName());
if (connectorConfig.getCustomMetricTags().size() > 0) {
String customTags = connectorConfig.getCustomMetricTags().entrySet().stream()
.map(e -> e.getKey() + "=" + Sanitizer.jmxSanitize(e.getValue()))
.collect(Collectors.joining(","));
tags += "," + customTags;
}
return tags;
}
}