DBZ-7177 DBZ-6764 Use correct context name and add suggestions for code review
This commit is contained in:
parent
5a27d42257
commit
4b36ed9975
@ -9,7 +9,6 @@
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
@ -59,13 +58,10 @@ public MongoDbConnector getConnector() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer)
|
||||
public MetricsDescriptor getMetrics(String connectorName)
|
||||
throws MalformedObjectNameException {
|
||||
Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
|
||||
String serverName = connectorConfig.get("topic.prefix");
|
||||
String tasksMax = connectorConfig.get("tasks.max");
|
||||
|
||||
return queryMetrics(connectorName, Module.name(), "streaming", mBeanServer, serverName, tasksMax, null);
|
||||
return queryMetrics(connectorConfig, connectorName, Module.contextName().toLowerCase(), "streaming");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -9,7 +9,6 @@
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
@ -58,12 +57,9 @@ public MySqlConnector getConnector() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException {
|
||||
public MetricsDescriptor getMetrics(String connectorName) throws MalformedObjectNameException {
|
||||
Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
|
||||
String serverName = connectorConfig.get("topic.prefix");
|
||||
String tasksMax = connectorConfig.get("tasks.max");
|
||||
|
||||
return queryMetrics(connectorName, Module.name(), "streaming", mBeanServer, serverName, tasksMax, null);
|
||||
return queryMetrics(connectorConfig, connectorName, Module.contextName().toLowerCase(), "streaming");
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -6,9 +6,9 @@
|
||||
package io.debezium.connector.oracle.rest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
@ -57,11 +57,9 @@ public OracleConnector getConnector() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException {
|
||||
String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix");
|
||||
String tasksMax = connectClusterState.connectorConfig(connectorName).get("tasks.max");
|
||||
|
||||
return queryMetrics(connectorName, Module.name(), "streaming", mBeanServer, serverName, tasksMax, null);
|
||||
public MetricsDescriptor getMetrics(String connectorName) throws MalformedObjectNameException {
|
||||
Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
|
||||
return queryMetrics(connectorConfig, connectorName, Module.contextName().toLowerCase(), "streaming");
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -35,11 +35,4 @@ public static String name() {
|
||||
public static String contextName() {
|
||||
return "Postgres";
|
||||
}
|
||||
|
||||
/**
|
||||
* @return logical name of the connector used in mbean object name
|
||||
*/
|
||||
public static String logicalName() {
|
||||
return "postgres";
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,6 @@
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
@ -60,12 +59,9 @@ public PostgresConnector getConnector() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException {
|
||||
public MetricsDescriptor getMetrics(String connectorName) throws MalformedObjectNameException {
|
||||
Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
|
||||
String serverName = connectorConfig.get("topic.prefix");
|
||||
String tasksMax = connectorConfig.get("tasks.max");
|
||||
|
||||
return queryMetrics(connectorName, Module.logicalName(), "streaming", mBeanServer, serverName, tasksMax, null);
|
||||
return queryMetrics(connectorConfig, connectorName, Module.contextName().toLowerCase(), "streaming");
|
||||
}
|
||||
|
||||
public String getSchemaFilePath() {
|
||||
|
@ -35,11 +35,4 @@ public static String name() {
|
||||
public static String contextName() {
|
||||
return "SQL_Server";
|
||||
}
|
||||
|
||||
/**
|
||||
* @return logical name of the connector used in mbean object name
|
||||
*/
|
||||
public static String logicalName() {
|
||||
return "sql_server";
|
||||
}
|
||||
}
|
||||
|
@ -9,7 +9,6 @@
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
@ -59,15 +58,10 @@ public SqlServerConnector getConnector() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer)
|
||||
public MetricsDescriptor getMetrics(String connectorName)
|
||||
throws MalformedObjectNameException {
|
||||
Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
|
||||
String serverName = connectorConfig.get("topic.prefix");
|
||||
String tasksMax = connectorConfig.get("tasks.max");
|
||||
String databaseNames = connectorConfig.get("database.names");
|
||||
String[] namespaces = databaseNames.split(",");
|
||||
|
||||
return queryMetrics(connectorName, Module.logicalName(), "streaming", mBeanServer, serverName, tasksMax, namespaces);
|
||||
return queryMetrics(connectorConfig, connectorName, Module.contextName().toLowerCase(), "streaming");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -32,22 +32,23 @@
|
||||
|
||||
public interface MetricsResource extends MetricsAttributes {
|
||||
String CONNECTOR_METRICS_ENDPOINT = "/connectors/{connector-name}/metrics";
|
||||
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||
|
||||
Connector getConnector();
|
||||
|
||||
MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer)
|
||||
MetricsDescriptor getMetrics(String connectorName)
|
||||
throws MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, ReflectionException, AttributeNotFoundException, MBeanException;
|
||||
|
||||
default ObjectName getObjectName(String connector, String context, String serverName) throws MalformedObjectNameException {
|
||||
return new ObjectName(String.format("debezium.%s:type=connector-metrics,context=%s,server=%s", connector, context, serverName));
|
||||
}
|
||||
|
||||
default ObjectName getObjectNameWithTask(String connector, String context, String serverName, String task) throws MalformedObjectNameException {
|
||||
default ObjectName getObjectName(String connector, String context, String serverName, String task) throws MalformedObjectNameException {
|
||||
return new ObjectName(
|
||||
String.format("debezium.%s:type=connector-metrics,context=%s,server=%s,task=%s", connector, context, serverName, task));
|
||||
}
|
||||
|
||||
default ObjectName getObjectNameWithDatabase(String connector, String context, String serverName, String task, String databaseName)
|
||||
default ObjectName getObjectName(String connector, String context, String serverName, String task, String databaseName)
|
||||
throws MalformedObjectNameException {
|
||||
return new ObjectName(
|
||||
String.format("debezium.%s:type=connector-metrics,context=%s,server=%s,task=%s,database=%s", connector, context, serverName, task, databaseName));
|
||||
@ -69,16 +70,24 @@ default String getAttributeValue(String attribute, ObjectName objectName, String
|
||||
}
|
||||
}
|
||||
|
||||
default MetricsDescriptor queryMetrics(String connectorName, String connector, String context, MBeanServer mBeanServer, String serverName, String tasksMax,
|
||||
String[] namespaces)
|
||||
default MetricsDescriptor queryMetrics(Map<String, String> connectorConfig, String connectorName, String connector, String context)
|
||||
throws MalformedObjectNameException {
|
||||
|
||||
String serverName = connectorConfig.get("topic.prefix");
|
||||
String tasksMax = connectorConfig.get("tasks.max");
|
||||
String databaseNames = connectorConfig.get("database.names");
|
||||
String[] namespaces = null;
|
||||
if (databaseNames != null) {
|
||||
namespaces = databaseNames.split(",");
|
||||
}
|
||||
|
||||
List<MetricsDescriptor.Task> tasksPayload = new ArrayList<>();
|
||||
MetricsDescriptor.Connector connectorPayload = null;
|
||||
|
||||
for (int task = 0; task < Integer.parseInt(tasksMax); task++) {
|
||||
ObjectName objectName;
|
||||
if (connector.equals("sql_server") || connector.equals("mongodb")) {
|
||||
objectName = getObjectNameWithTask(connector, context, serverName, String.valueOf(task));
|
||||
objectName = getObjectName(connector, context, serverName, String.valueOf(task));
|
||||
}
|
||||
else {
|
||||
objectName = getObjectName(connector, context, serverName);
|
||||
@ -91,7 +100,7 @@ default MetricsDescriptor queryMetrics(String connectorName, String connector, S
|
||||
Map<String, String> connectorAttributes;
|
||||
if (namespaces != null) {
|
||||
for (String namespace : namespaces) {
|
||||
ObjectName objectNameWithNamespace = getObjectNameWithDatabase(connector, context, serverName, String.valueOf(task), namespace);
|
||||
ObjectName objectNameWithNamespace = getObjectName(connector, context, serverName, String.valueOf(task), namespace);
|
||||
connectorAttributes = getAttributes(getConnectorAttributes(), objectNameWithNamespace, connectorName, mBeanServer);
|
||||
namespacesPayload.add(new MetricsDescriptor.Namespace(namespace, connectorAttributes));
|
||||
}
|
||||
@ -125,8 +134,6 @@ default MetricsDescriptor getConnectorMetrics(@PathParam("connector-name") Strin
|
||||
if (getConnector() == null) {
|
||||
throw new RuntimeException("Unable to fetch metrics for connector " + connectorName + " as the connector is not available.");
|
||||
}
|
||||
|
||||
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||
return getMetrics(connectorName, mBeanServer);
|
||||
return getMetrics(connectorName);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user