DBZ-5011 Set task metrics attribute when using only 1 task

This commit is contained in:
Chris Cranford 2022-05-27 11:19:46 -04:00 committed by Jiri Pechanec
parent 9fa5a48274
commit 28cd36edff
7 changed files with 14 additions and 34 deletions

View File

@ -121,7 +121,7 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
replicaSets,
taskContext,
schema),
new MongoDbChangeEventSourceMetricsFactory(connectorConfig),
new MongoDbChangeEventSourceMetricsFactory(),
dispatcher,
schema);

View File

@ -7,7 +7,6 @@
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
@ -19,23 +18,17 @@
*/
public class MongoDbChangeEventSourceMetricsFactory extends DefaultChangeEventSourceMetricsFactory<MongoDbPartition> {
private final int maxTasks;
public MongoDbChangeEventSourceMetricsFactory(MongoDbConnectorConfig connectorConfig) {
this.maxTasks = connectorConfig.getConfig().getInteger("tasks.max", 1);
}
@Override
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics<MongoDbPartition> getSnapshotMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new MongoDbSnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider, maxTasks);
return new MongoDbSnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
@Override
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics<MongoDbPartition> getStreamingMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new MongoDbStreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider, maxTasks);
return new MongoDbStreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
}

View File

@ -27,11 +27,9 @@ public class MongoDbSnapshotChangeEventSourceMetrics extends DefaultSnapshotChan
private AtomicLong numberOfDisconnects = new AtomicLong();
public <T extends CdcSourceTaskContext> MongoDbSnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider, int maxTasks) {
EventMetadataProvider metadataProvider) {
super(taskContext, changeEventQueueMetrics, metadataProvider,
maxTasks > 1
? Collect.linkMapOf("context", "snapshot", "server", taskContext.getConnectorName(), "task", taskContext.getTaskId())
: Collect.linkMapOf("context", "snapshot", "server", taskContext.getConnectorName()));
Collect.linkMapOf("context", "snapshot", "server", taskContext.getConnectorName(), "task", taskContext.getTaskId()));
}
@Override

View File

@ -29,11 +29,9 @@ public class MongoDbStreamingChangeEventSourceMetrics extends DefaultStreamingCh
private AtomicLong numberOfDisconnects = new AtomicLong();
<T extends CdcSourceTaskContext> MongoDbStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider, int maxTasks) {
EventMetadataProvider eventMetadataProvider) {
super(taskContext, changeEventQueueMetrics, eventMetadataProvider,
maxTasks > 1
? Collect.linkMapOf("context", "streaming", "server", taskContext.getConnectorName(), "task", taskContext.getTaskId())
: Collect.linkMapOf("context", "streaming", "server", taskContext.getConnectorName()));
Collect.linkMapOf("context", "streaming", "server", taskContext.getConnectorName(), "task", taskContext.getTaskId()));
}
@Override

View File

@ -51,6 +51,9 @@
*/
public abstract class AbstractMongoConnectorIT extends AbstractConnectorTest {
// the one and only task we start in the test suite
private static final int TASK_ID = 0;
protected Configuration config;
protected MongoDbTaskContext context;
protected LogInterceptor logInterceptor;
@ -307,7 +310,7 @@ protected void storeDocuments(MongoCollection<Document> collection, String pathO
}
public static ObjectName getSnapshotMetricsObjectName(String connector, String server) {
return getMetricsObjectNameWithTags(connector, Collect.linkMapOf("context", "snapshot", "server", server));
return getSnapshotMetricsObjectName(connector, server, TASK_ID);
}
public static ObjectName getSnapshotMetricsObjectName(String connector, String server, int taskId) {
@ -324,7 +327,7 @@ public static void waitForSnapshotToBeCompleted(String connector, String server,
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server) {
return getMetricsObjectNameWithTags(connector, Collect.linkMapOf("context", getStreamingNamespace(), "server", server));
return getStreamingMetricsObjectName(connector, server, TASK_ID);
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, int taskId) {

View File

@ -1,11 +1,5 @@
ifeval::['{context}' == 'mongodb']
The *MBean* is `debezium.{mbean-name}:type=connector-metrics,context=snapshot,server=_<{context}.server.name>_`.
[NOTE]
====
When the connector is configured with `tasks.max` greater than `1`, the MBean name will also include the task identifier.
As an example, when using a value of `2`, the MBean names will also be suffixed by `,task=0` and `,task=1` respectively.
====
The *MBean* is `debezium.{mbean-name}:type=connector-metrics,context=snapshot,server=_<{context}.server.name>_,task=_<task.id>_`.
endif::[]
ifeval::['{context}' != 'mongodb']
The *MBean* is `debezium.{mbean-name}:type=connector-metrics,context=snapshot,server=_<{context}.server.name>_`.

View File

@ -1,11 +1,5 @@
ifeval::['{context}' == 'mongodb']
The *MBean* is `debezium.{mbean-name}:type=connector-metrics,context=streaming,server=_<{context}.server.name>_`.
[NOTE]
====
When the connector is configured with `tasks.max` greater than `1`, the MBean name will also include the task identifier.
As an example, when using a value of `2`, the MBean names will also be suffixed by `,task=0` and `,task=1` respectively.
====
The *MBean* is `debezium.{mbean-name}:type=connector-metrics,context=streaming,server=_<{context}.server.name>_,task=_<task.id>_`.
endif::[]
ifeval::['{context}' != 'mongodb']
The *MBean* is `debezium.{mbean-name}:type=connector-metrics,context=streaming,server=_<{context}.server.name>_`.