diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java index a8983e386..f1ab2dd0e 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java @@ -30,7 +30,11 @@ public class MongoDbTaskContext extends CdcSourceTaskContext { * @param config the configuration */ public MongoDbTaskContext(Configuration config) { - super(Module.contextName(), config.getString(CommonConnectorConfig.TOPIC_PREFIX), config.getString(MongoDbConnectorConfig.TASK_ID), Collections::emptySet); + super(Module.contextName(), + config.getString(CommonConnectorConfig.TOPIC_PREFIX), + config.getString(MongoDbConnectorConfig.TASK_ID), + new MongoDbConnectorConfig(config).getCustomMetricTags(), + Collections::emptySet); this.filters = new Filters(config); this.connectorConfig = new MongoDbConnectorConfig(config); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbSnapshotChangeEventSourceMetrics.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbSnapshotChangeEventSourceMetrics.java index 7187477f6..a0ab069d5 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbSnapshotChangeEventSourceMetrics.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbSnapshotChangeEventSourceMetrics.java @@ -24,7 +24,7 @@ public class MongoDbSnapshotChangeEventSourceMetrics extends DefaultSnapshotChangeEventSourceMetrics implements MongoDbSnapshotChangeEventSourceMetricsMBean { - private AtomicLong numberOfDisconnects = new AtomicLong(); + private final AtomicLong numberOfDisconnects = new AtomicLong(); public MongoDbSnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index 52654c03b..267d61317 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -21,7 +21,7 @@ public class MySqlTaskContext extends CdcSourceTaskContext { private final BinaryLogClient binaryLogClient; public MySqlTaskContext(MySqlConnectorConfig config, MySqlDatabaseSchema schema) { - super(config.getContextName(), config.getLogicalName(), schema::tableIds); + super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), schema::tableIds); this.schema = schema; this.binaryLogClient = new BinaryLogClient(config.hostname(), config.port(), config.username(), config.password()); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleTaskContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleTaskContext.java index 4192d2bc6..37ecf595d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleTaskContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleTaskContext.java @@ -10,6 +10,6 @@ public class OracleTaskContext extends CdcSourceTaskContext { public OracleTaskContext(OracleConnectorConfig config, OracleDatabaseSchema schema) { - super(config.getContextName(), config.getLogicalName(), schema::tableIds); + super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), schema::tableIds); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index cd1c22d73..2cad636b8 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -41,7 +41,7 @@ public class PostgresTaskContext extends CdcSourceTaskContext { private Long lastXmin; protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy topicNamingStrategy) { - super(config.getContextName(), config.getLogicalName(), Collections::emptySet); + super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), Collections::emptySet); this.config = config; if (config.xminFetchInterval().toMillis() > 0) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java index 142528cb6..a6b26349f 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java @@ -16,6 +16,6 @@ public class SqlServerTaskContext extends CdcSourceTaskContext { public SqlServerTaskContext(SqlServerConnectorConfig config, SqlServerDatabaseSchema schema) { - super(config.getContextName(), config.getLogicalName(), config.getTaskId(), schema::tableIds); + super(config.getContextName(), config.getLogicalName(), config.getTaskId(), config.getCustomMetricTags(), schema::tableIds); } } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index ce2b8903c..24993fadb 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -10,7 +10,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -697,7 +700,7 @@ public static FieldNameAdjustmentMode parse(String value) { public static final Field MAX_RETRIES_ON_ERROR = Field.create(ERRORS_MAX_RETRIES) .withDisplayName("The maximum number of retries") .withType(Type.INT) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 22)) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 24)) .withWidth(Width.MEDIUM) .withImportance(Importance.LOW) .withDefault(DEFAULT_MAX_RETRIES) @@ -705,6 +708,17 @@ public static FieldNameAdjustmentMode parse(String value) { .withDescription( "The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries)."); + public static final Field CUSTOM_METRIC_TAGS = Field.create("custom.metric.tags") + .withDisplayName("Customize metric tags") + .withType(Type.LIST) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 25)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withValidation(Field::isListOfMap) + .withDescription("The custom metric tags will accept key-value pairs to customize the MBean object name, " + + "each key would represent a tag for the MBean object name, " + + "and the corresponding value would be the value of that tag the key is. For example: k1=v1,k2=v2"); + protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor() .connector( EVENT_PROCESSING_FAILURE_HANDLING_MODE, @@ -731,7 +745,8 @@ public static FieldNameAdjustmentMode parse(String value) { SIGNAL_ENABLED_CHANNELS, TOPIC_NAMING_STRATEGY, NOTIFICATION_ENABLED_CHANNELS, - SinkNotificationChannel.NOTIFICATION_TOPIC) + SinkNotificationChannel.NOTIFICATION_TOPIC, + CUSTOM_METRIC_TAGS) .create(); private final Configuration config; @@ -768,6 +783,7 @@ public static FieldNameAdjustmentMode parse(String value) { private final String notificationTopicName; private final List enabledNotificationChannels; + private final Map customMetricTags; protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSize) { this.config = config; @@ -802,6 +818,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi this.enabledNotificationChannels = config.getList(NOTIFICATION_ENABLED_CHANNELS); this.skipMessagesWithoutChange = config.getBoolean(SKIP_MESSAGES_WITHOUT_CHANGE); this.maxRetriesOnError = config.getInteger(MAX_RETRIES_ON_ERROR); + this.customMetricTags = createCustomMetricTags(config); } private static List getSignalEnabledChannels(Configuration config) { @@ -987,6 +1004,31 @@ public Set getDataCollectionsToBeSnapshotted() { .orElseGet(Collections::emptySet); } + public Map getCustomMetricTags() { + return customMetricTags; + } + + public Map createCustomMetricTags(Configuration config) { + // Keep the map custom metric tags sequence + HashMap result = new LinkedHashMap<>(); + + String rawValue = config.getString(CUSTOM_METRIC_TAGS); + if (Strings.isNullOrBlank(rawValue)) { + return result; + } + + List values = Strings.listOf(rawValue, x -> x.split(","), String::trim); + for (String v : values) { + List items = Strings.listOf(v, x -> x.split("="), String::trim); + result.put(items.get(0), items.get(1)); + } + if (result.size() < values.size()) { + LOGGER.warn("There are duplicated key-value pairs: {}", rawValue); + } + + return result; + } + /** * @return true if the connector should emit messages about schema changes into a public facing * topic. diff --git a/debezium-core/src/main/java/io/debezium/config/Field.java b/debezium-core/src/main/java/io/debezium/config/Field.java index 1437d6bde..a04cd5dad 100644 --- a/debezium-core/src/main/java/io/debezium/config/Field.java +++ b/debezium-core/src/main/java/io/debezium/config/Field.java @@ -1171,6 +1171,23 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut return errors; } + public static int isListOfMap(Configuration config, Field field, ValidationOutput problems) { + String value = config.getString(field); + int errors = 0; + + if (!Strings.isNullOrBlank(value)) { + List values = Strings.listOf(value, x -> x.split(","), String::trim); + for (String v : values) { + List items = Strings.listOf(v, x -> x.split("="), String::trim); + if (items.size() != 2) { + problems.accept(field, value, "A equivalent-separated map of valid key/value pairs is expected, for example: k1=v1,k2=v2"); + return ++errors; + } + } + } + return errors; + } + public static int isRegex(Configuration config, Field field, ValidationOutput problems) { String value = config.getString(field); int errors = 0; diff --git a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java index ff0cc3f4f..6f0e26f31 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java @@ -7,6 +7,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.function.Supplier; import org.apache.kafka.connect.source.SourceTask; @@ -27,6 +28,7 @@ public class CdcSourceTaskContext { private final String connectorType; private final String connectorName; private final String taskId; + private final Map customMetricTags; private final Clock clock; /** @@ -34,17 +36,25 @@ public class CdcSourceTaskContext { */ private final Supplier> collectionsSupplier; - public CdcSourceTaskContext(String connectorType, String connectorName, String taskId, Supplier> collectionsSupplier) { + public CdcSourceTaskContext(String connectorType, + String connectorName, + String taskId, + Map customMetricTags, + Supplier> collectionsSupplier) { this.connectorType = connectorType; this.connectorName = connectorName; this.taskId = taskId; + this.customMetricTags = customMetricTags; this.collectionsSupplier = collectionsSupplier != null ? collectionsSupplier : Collections::emptyList; this.clock = Clock.system(); } - public CdcSourceTaskContext(String connectorType, String connectorName, Supplier> collectionsSupplier) { - this(connectorType, connectorName, "0", collectionsSupplier); + public CdcSourceTaskContext(String connectorType, + String connectorName, + Map customMetricTags, + Supplier> collectionsSupplier) { + this(connectorType, connectorName, "0", customMetricTags, collectionsSupplier); } /** @@ -100,4 +110,8 @@ public String getConnectorName() { public String getTaskId() { return taskId; } + + public Map getCustomMetricTags() { + return customMetricTags; + } } diff --git a/debezium-core/src/main/java/io/debezium/metrics/Metrics.java b/debezium-core/src/main/java/io/debezium/metrics/Metrics.java index 5f81956fb..979cfe7d9 100644 --- a/debezium-core/src/main/java/io/debezium/metrics/Metrics.java +++ b/debezium-core/src/main/java/io/debezium/metrics/Metrics.java @@ -32,10 +32,11 @@ public abstract class Metrics { private volatile boolean registered = false; protected Metrics(CdcSourceTaskContext taskContext, String contextName) { - this.name = metricName(taskContext.getConnectorType(), taskContext.getConnectorName(), contextName); + this.name = metricName(taskContext.getConnectorType(), taskContext.getConnectorName(), contextName, taskContext.getCustomMetricTags()); } protected Metrics(CdcSourceTaskContext taskContext, Map tags) { + tags.putAll(taskContext.getCustomMetricTags()); this.name = metricName(taskContext.getConnectorType(), tags); } @@ -49,7 +50,7 @@ protected Metrics(CommonConnectorConfig connectorConfig, String contextName, boo "context", contextName)); } else { - this.name = metricName(connectorType, connectorName, contextName); + this.name = metricName(connectorType, connectorName, contextName, connectorConfig.getCustomMetricTags()); } } @@ -76,8 +77,10 @@ public synchronized void unregister() { } } - protected ObjectName metricName(String connectorType, String connectorName, String contextName) { - return metricName(connectorType, Collect.linkMapOf("context", contextName, "server", connectorName)); + protected ObjectName metricName(String connectorType, String connectorName, String contextName, Map customTags) { + Map tags = Collect.linkMapOf("context", contextName, "server", connectorName); + tags.putAll(customTags); + return metricName(connectorType, tags); } /** diff --git a/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java b/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java index 4e769bff1..9d7bf1949 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/JmxUtils.java @@ -7,6 +7,7 @@ import java.lang.management.ManagementFactory; import java.time.Duration; +import java.util.stream.Collectors; import javax.management.InstanceAlreadyExistsException; import javax.management.InstanceNotFoundException; @@ -15,6 +16,7 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.kafka.common.utils.Sanitizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +115,13 @@ public static void unregisterMXBean(CommonConnectorConfig connectorConfig, Strin } private static String getManagementJmxObjectName(String type, String context, CommonConnectorConfig connectorConfig) { - return String.format(JMX_OBJECT_NAME_FORMAT, connectorConfig.getContextName().toLowerCase(), type, context, connectorConfig.getLogicalName()); + String tags = String.format(JMX_OBJECT_NAME_FORMAT, connectorConfig.getContextName().toLowerCase(), type, context, connectorConfig.getLogicalName()); + if (connectorConfig.getCustomMetricTags().size() > 0) { + String customTags = connectorConfig.getCustomMetricTags().entrySet().stream() + .map(e -> e.getKey() + "=" + Sanitizer.jmxSanitize(e.getValue())) + .collect(Collectors.joining(",")); + tags += "," + customTags; + } + return tags; } }