DBZ-4783: Add task id label to metrics

This commit is contained in:
Josh Ribera 2021-04-28 13:24:34 -04:00 committed by Gunnar Morling
parent 37a67d5dcf
commit fc6d55e935
9 changed files with 28 additions and 7 deletions

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.sqlserver;
import static io.debezium.config.CommonConnectorConfig.TASK_ID;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
@ -64,6 +66,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
final String realDatabaseName = connection.retrieveRealDatabaseName(databaseName);
if (!sqlServerConfig.isMultiPartitionModeEnabled()) {
taskConfig.put(SqlServerConnectorConfig.DATABASE_NAME.name(), realDatabaseName);
taskConfig.put(TASK_ID, "0");
}
else {
taskConfig.put(SqlServerConnectorConfig.DATABASE_NAMES.name(), realDatabaseName);

View File

@ -16,6 +16,6 @@
public class SqlServerTaskContext extends CdcSourceTaskContext {
public SqlServerTaskContext(SqlServerConnectorConfig config, SqlServerDatabaseSchema schema) {
super(config.getContextName(), config.getLogicalName(), schema::tableIds);
super(config.getContextName(), config.getLogicalName(), config.getTaskId(), schema::tableIds);
}
}

View File

@ -40,7 +40,7 @@ abstract class AbstractSqlServerTaskMetrics<B extends AbstractSqlServerPartition
Function<SqlServerPartition, B> beanFactory) {
super(taskContext, Collect.linkMapOf(
"server", taskContext.getConnectorName(),
"task", "0",
"task", taskContext.getTaskId(),
"context", contextName));
this.changeEventQueueMetrics = changeEventQueueMetrics;

View File

@ -27,7 +27,7 @@ class SqlServerSnapshotTaskMetrics extends AbstractSqlServerTaskMetrics<SqlServe
(SqlServerPartition partition) -> new SqlServerSnapshotPartitionMetrics(taskContext,
Collect.linkMapOf(
"server", taskContext.getConnectorName(),
"task", "0",
"task", taskContext.getTaskId(),
"context", "snapshot",
"database", partition.getDatabaseName()),
metadataProvider));

View File

@ -28,7 +28,7 @@ class SqlServerStreamingTaskMetrics extends AbstractSqlServerTaskMetrics<SqlServ
(SqlServerPartition partition) -> new SqlServerStreamingPartitionMetrics(taskContext,
Collect.linkMapOf(
"server", taskContext.getConnectorName(),
"task", "0",
"task", taskContext.getTaskId(),
"context", "streaming",
"database", partition.getDatabaseName()),
metadataProvider));

View File

@ -58,6 +58,7 @@ public class TestHelper {
public static final String TEST_DATABASE = "testDB";
private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
private static final String TEST_TASK_ID = "0";
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n"
@ -467,7 +468,7 @@ private static ObjectName getObjectName(String context, String serverName) {
private static ObjectName getObjectName(String context, String serverName, String databaseName) {
return getObjectName(Collect.linkMapOf(
"server", serverName,
"task", "0",
"task", TEST_TASK_ID,
"context", context,
"database", databaseName));
}

View File

@ -47,6 +47,7 @@
* @author Gunnar Morling
*/
public abstract class CommonConnectorConfig {
public static final String TASK_ID = "task.id";
private static final Logger LOGGER = LoggerFactory.getLogger(CommonConnectorConfig.class);
@ -538,6 +539,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
private final String signalingDataCollection;
private final EnumSet<Operation> skippedOperations;
private final String transactionTopic;
private final String taskId;
protected CommonConnectorConfig(Configuration config, String logicalName, int defaultSnapshotFetchSize) {
this.config = config;
@ -565,6 +567,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.signalingDataCollection = config.getString(SIGNAL_DATA_COLLECTION);
this.skippedOperations = determineSkippedOperations(config);
this.transactionTopic = config.getString(TRANSACTION_TOPIC).replace("${database.server.name}", logicalName);
this.taskId = config.getString(TASK_ID);
}
private static EnumSet<Envelope.Operation> determineSkippedOperations(Configuration config) {
@ -875,4 +878,8 @@ public boolean isSignalDataCollection(DataCollectionId dataCollectionId) {
public Optional<String> customRetriableException() {
return Optional.ofNullable(config.getString(CUSTOM_RETRIABLE_EXCEPTION));
}
public String getTaskId() {
return taskId;
}
}

View File

@ -25,6 +25,7 @@ public class CdcSourceTaskContext {
private final String connectorType;
private final String connectorName;
private final String taskId;
private final Clock clock;
/**
@ -32,14 +33,19 @@ public class CdcSourceTaskContext {
*/
private final Supplier<Collection<? extends DataCollectionId>> collectionsSupplier;
public CdcSourceTaskContext(String connectorType, String connectorName, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
public CdcSourceTaskContext(String connectorType, String connectorName, String taskId, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this.connectorType = connectorType;
this.connectorName = connectorName;
this.taskId = taskId;
this.collectionsSupplier = collectionsSupplier != null ? collectionsSupplier : Collections::emptyList;
this.clock = Clock.system();
}
public CdcSourceTaskContext(String connectorType, String connectorName, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this(connectorType, connectorName, "0", collectionsSupplier);
}
/**
* Configure the logger's Mapped Diagnostic Context (MDC) properties for the thread making this call.
*
@ -85,4 +91,8 @@ public String getConnectorType() {
public String getConnectorName() {
return connectorName;
}
public String getTaskId() {
return taskId;
}
}

View File

@ -51,7 +51,7 @@ protected Metrics(CommonConnectorConfig connectorConfig, String contextName, boo
if (multiPartitionMode) {
this.name = metricName(connectorType, Collect.linkMapOf(
"server", connectorName,
"task", "0",
"task", connectorConfig.getTaskId(),
"context", contextName));
}
else {