DBZ-6764 DBZ-7178 DBZ-7177 Use MBeanServer to query metrics for REST endpoint and create new metrics descriptor

This commit is contained in:
ani-sha 2023-12-11 12:33:45 +05:30 committed by Jiri Pechanec
parent ed4e497718
commit 2555a1cee2
20 changed files with 305 additions and 215 deletions

View File

@ -5,9 +5,14 @@
*/ */
package io.debezium.connector.mongodb.rest; package io.debezium.connector.mongodb.rest;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -15,7 +20,6 @@
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.health.ConnectClusterState;
import org.json.simple.JSONObject;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module; import io.debezium.connector.mongodb.Module;
@ -24,7 +28,8 @@
import io.debezium.rest.FilterValidationResource; import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.MetricsResource; import io.debezium.rest.MetricsResource;
import io.debezium.rest.SchemaResource; import io.debezium.rest.SchemaResource;
import io.debezium.rest.jolokia.JolokiaClient; import io.debezium.rest.metrics.MetricsAttributes;
import io.debezium.rest.metrics.MetricsDescriptor;
import io.debezium.rest.model.DataCollection; import io.debezium.rest.model.DataCollection;
/** /**
@ -57,15 +62,24 @@ public MongoDbConnector getConnector() {
} }
@Override @Override
public String getAttributesFilePath() { public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer)
return "META-INF/mongodb-attributes.txt"; throws MalformedObjectNameException {
} Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
String serverName = connectorConfig.get("topic.prefix");
String tasksMax = connectorConfig.get("tasks.max");
@Override MetricsDescriptor.Connector connector = null;
public List<JSONObject> getMetrics(String connectorName, JolokiaClient client, List<String> attributes) { List<MetricsDescriptor.Task> tasks = new ArrayList<>();
String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix");
String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); for (int task = 0; task < Integer.parseInt(tasksMax); task++) {
return client.getConnectorMetrics("mongodb", serverName, Integer.valueOf(task), attributes); ObjectName objectName = getObjectNameWithTask("mongodb", "streaming", serverName, String.valueOf(task));
Map<String, String> connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, mBeanServer);
Map<String, String> connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName, mBeanServer);
connector = new MetricsDescriptor.Connector(connectionAttributes);
tasks.add(new MetricsDescriptor.Task(task, List.of(new MetricsDescriptor.Database("", connectorAttributes))));
}
return new MetricsDescriptor(connectorName, tasksMax, connector, tasks);
} }
@Override @Override

View File

@ -1,3 +0,0 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -223,13 +223,12 @@ public void testMetricsEndpoint() {
.get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all() .then().log().all()
.statusCode(200) .statusCode(200)
.assertThat().body("size()", is(3)) .body("name", equalTo(connectorName))
.body("[0].request.attribute", is("Connected")) .body("connector.metrics.Connected", equalTo("true"))
.body("[0].value", equalTo(true)) .body("tasks[0].id", equalTo(0))
.body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("[1].value", equalTo(-1)) .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0"));
.body("[2].request.attribute", is("TotalNumberOfEventsSeen"))
.body("[2].value", equalTo(0));
} }
public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) { public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) {

View File

@ -6,8 +6,12 @@
package io.debezium.connector.mysql.rest; package io.debezium.connector.mysql.rest;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -15,7 +19,6 @@
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.health.ConnectClusterState;
import org.json.simple.JSONObject;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mysql.Module; import io.debezium.connector.mysql.Module;
@ -24,7 +27,8 @@
import io.debezium.rest.FilterValidationResource; import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.MetricsResource; import io.debezium.rest.MetricsResource;
import io.debezium.rest.SchemaResource; import io.debezium.rest.SchemaResource;
import io.debezium.rest.jolokia.JolokiaClient; import io.debezium.rest.metrics.MetricsAttributes;
import io.debezium.rest.metrics.MetricsDescriptor;
import io.debezium.rest.model.DataCollection; import io.debezium.rest.model.DataCollection;
/** /**
@ -56,15 +60,21 @@ public MySqlConnector getConnector() {
} }
@Override @Override
public String getAttributesFilePath() { public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException {
return "META-INF/mysql-attributes.txt"; Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
} String serverName = connectorConfig.get("topic.prefix");
String tasksMax = connectorConfig.get("tasks.max");
@Override ObjectName objectName = getObjectName("mysql", "streaming", serverName);
public List<JSONObject> getMetrics(String connectorName, JolokiaClient client, List<String> attributes) { Map<String, String> connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName,
String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); mBeanServer);
String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); Map<String, String> connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName,
return client.getConnectorMetrics("mysql", serverName, Integer.valueOf(task), attributes); mBeanServer);
MetricsDescriptor.Connector connector = new MetricsDescriptor.Connector(connectionAttributes);
List<MetricsDescriptor.Task> tasks = List.of(new MetricsDescriptor.Task(0, List.of(new MetricsDescriptor.Database("", connectorAttributes))));
return new MetricsDescriptor(connectorName, tasksMax, connector, tasks);
} }
@GET @GET

View File

@ -1,3 +0,0 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -224,13 +224,11 @@ public void testMetricsEndpoint() {
.get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all() .then().log().all()
.statusCode(200) .statusCode(200)
.assertThat().body("size()", is(3)) .body("name", equalTo(connectorName))
.body("[0].request.attribute", is("Connected")) .body("connector.metrics.Connected", equalTo("true"))
.body("[0].value", equalTo(true)) .body("tasks[0].id", equalTo(0))
.body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("[1].value", equalTo(0)) .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0"));
.body("[2].request.attribute", is("TotalNumberOfEventsSeen"))
.body("[2].value", equalTo(14234));
} }
public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) { public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) {

View File

@ -6,8 +6,12 @@
package io.debezium.connector.oracle.rest; package io.debezium.connector.oracle.rest;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -15,7 +19,6 @@
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.health.ConnectClusterState;
import org.json.simple.JSONObject;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.oracle.Module; import io.debezium.connector.oracle.Module;
@ -24,7 +27,8 @@
import io.debezium.rest.FilterValidationResource; import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.MetricsResource; import io.debezium.rest.MetricsResource;
import io.debezium.rest.SchemaResource; import io.debezium.rest.SchemaResource;
import io.debezium.rest.jolokia.JolokiaClient; import io.debezium.rest.metrics.MetricsAttributes;
import io.debezium.rest.metrics.MetricsDescriptor;
import io.debezium.rest.model.DataCollection; import io.debezium.rest.model.DataCollection;
/** /**
@ -56,16 +60,20 @@ public OracleConnector getConnector() {
} }
@Override @Override
public String getAttributesFilePath() { public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException {
return "META-INF/oracle-attributes.txt";
}
@Override
public List<JSONObject> getMetrics(String connectorName, JolokiaClient client, List<String> attributes) {
String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix");
String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); String tasksMax = connectClusterState.connectorConfig(connectorName).get("tasks.max");
return client.getConnectorMetrics("oracle", serverName, Integer.valueOf(task), attributes);
ObjectName objectName = getObjectName("oracle", "streaming", serverName);
Map<String, String> connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName,
mBeanServer);
Map<String, String> connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName,
mBeanServer);
MetricsDescriptor.Connector connector = new MetricsDescriptor.Connector(connectionAttributes);
List<MetricsDescriptor.Task> tasks = List.of(new MetricsDescriptor.Task(0, List.of(new MetricsDescriptor.Database("", connectorAttributes))));
return new MetricsDescriptor(connectorName, tasksMax, connector, tasks);
} }
@GET @GET

View File

@ -1,3 +0,0 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -197,13 +197,11 @@ public void testMetricsEndpoint() {
.get(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .get(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all() .then().log().all()
.statusCode(200) .statusCode(200)
.assertThat().body("size()", is(3)) .body("name", equalTo(connectorName))
.body("[0].request.attribute", is("Connected")) .body("connector.metrics.Connected", equalTo("true"))
.body("[0].value", equalTo(true)) .body("tasks[0].id", equalTo(0))
.body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("[1].value", equalTo(-1)) .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0"));
.body("[2].request.attribute", is("TotalNumberOfEventsSeen"))
.body("[2].value", equalTo(0));
} }
} }

View File

@ -6,8 +6,13 @@
package io.debezium.connector.postgresql.rest; package io.debezium.connector.postgresql.rest;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -15,7 +20,6 @@
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.health.ConnectClusterState;
import org.json.simple.JSONObject;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.Module; import io.debezium.connector.postgresql.Module;
@ -24,7 +28,8 @@
import io.debezium.rest.FilterValidationResource; import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.MetricsResource; import io.debezium.rest.MetricsResource;
import io.debezium.rest.SchemaResource; import io.debezium.rest.SchemaResource;
import io.debezium.rest.jolokia.JolokiaClient; import io.debezium.rest.metrics.MetricsAttributes;
import io.debezium.rest.metrics.MetricsDescriptor;
import io.debezium.rest.model.DataCollection; import io.debezium.rest.model.DataCollection;
/** /**
@ -57,15 +62,21 @@ public PostgresConnector getConnector() {
} }
@Override @Override
public String getAttributesFilePath() { public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException {
return "META-INF/postgres-attributes.txt"; Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
} String serverName = connectorConfig.get("topic.prefix");
String tasksMax = connectorConfig.get("tasks.max");
@Override ObjectName objectName = getObjectName("postgres", "streaming", serverName);
public List<JSONObject> getMetrics(String connectorName, JolokiaClient client, List<String> attributes) { Map<String, String> connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName,
String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); mBeanServer);
String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); Map<String, String> connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName,
return client.getConnectorMetrics("postgres", serverName, Integer.valueOf(task), attributes); mBeanServer);
MetricsDescriptor.Connector connector = new MetricsDescriptor.Connector(connectionAttributes);
List<MetricsDescriptor.Task> tasks = List.of(new MetricsDescriptor.Task(0, List.of(new MetricsDescriptor.Database("", connectorAttributes))));
return new MetricsDescriptor(connectorName, tasksMax, connector, tasks);
} }
public String getSchemaFilePath() { public String getSchemaFilePath() {

View File

@ -1,3 +0,0 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -222,13 +222,12 @@ public void testMetricsEndpoint() {
.get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all() .then().log().all()
.statusCode(200) .statusCode(200)
.assertThat().body("size()", is(3)) .body("name", equalTo(connectorName))
.body("[0].request.attribute", is("Connected")) .body("connector.metrics.Connected", equalTo("true"))
.body("[0].value", equalTo(true)) .body("tasks[0].id", equalTo(0))
.body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("[1].value", equalTo(-1)) .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0"));
.body("[2].request.attribute", is("TotalNumberOfEventsSeen"))
.body("[2].value", equalTo(0));
} }
private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) { private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) {

View File

@ -6,8 +6,13 @@
package io.debezium.connector.sqlserver.rest; package io.debezium.connector.sqlserver.rest;
import java.util.List; import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -15,7 +20,6 @@
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.health.ConnectClusterState;
import org.json.simple.JSONObject;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.Module; import io.debezium.connector.sqlserver.Module;
@ -24,7 +28,8 @@
import io.debezium.rest.FilterValidationResource; import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.MetricsResource; import io.debezium.rest.MetricsResource;
import io.debezium.rest.SchemaResource; import io.debezium.rest.SchemaResource;
import io.debezium.rest.jolokia.JolokiaClient; import io.debezium.rest.metrics.MetricsAttributes;
import io.debezium.rest.metrics.MetricsDescriptor;
import io.debezium.rest.model.DataCollection; import io.debezium.rest.model.DataCollection;
/** /**
@ -57,15 +62,33 @@ public SqlServerConnector getConnector() {
} }
@Override @Override
public String getAttributesFilePath() { public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer)
return "META-INF/sqlserver-attributes.txt"; throws MalformedObjectNameException {
} Map<String, String> connectorConfig = connectClusterState.connectorConfig(connectorName);
String serverName = connectorConfig.get("topic.prefix");
String tasksMax = connectorConfig.get("tasks.max");
String databaseNames = connectorConfig.get("database.names");
String[] databaseNamesArray = databaseNames.split(",");
@Override MetricsDescriptor.Connector connector = null;
public List<JSONObject> getMetrics(String connectorName, JolokiaClient client, List<String> attributes) { List<MetricsDescriptor.Task> tasks = new ArrayList<>();
String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix");
String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); for (int task = 0; task < Integer.parseInt(tasksMax); task++) {
return client.getConnectorMetrics("sql_server", serverName, Integer.valueOf(task), attributes); ObjectName objectName = getObjectNameWithTask("sql_server", "streaming", serverName, String.valueOf(task));
Map<String, String> connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, mBeanServer);
connector = new MetricsDescriptor.Connector(connectionAttributes);
List<MetricsDescriptor.Database> databases = new ArrayList<>();
for (String databaseName : databaseNamesArray) {
ObjectName objectNameWithDatabase = getObjectNameWithDatabase("sql_server", "streaming", serverName, String.valueOf(task), databaseName);
Map<String, String> connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectNameWithDatabase, connectorName, mBeanServer);
databases.add(new MetricsDescriptor.Database(databaseName, connectorAttributes));
}
tasks.add(new MetricsDescriptor.Task(task, databases));
}
return new MetricsDescriptor(connectorName, tasksMax, connector, tasks);
} }
@Override @Override

View File

@ -1,3 +0,0 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -204,13 +204,15 @@ public void testMetricsEndpoint() {
.get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all() .then().log().all()
.statusCode(200) .statusCode(200)
.assertThat().body("size()", is(3)) .body("name", equalTo(connectorName))
.body("[0].request.attribute", is("Connected")) .body("connector.metrics.Connected", equalTo("true"))
.body("[0].value", equalTo(true)) .body("tasks[0].database[0].name", equalTo("testDB"))
.body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("[1].value", equalTo(-1)) .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0"))
.body("[2].request.attribute", is("TotalNumberOfEventsSeen")) .body("tasks[0].database[1].name", equalTo("testDB2"))
.body("[2].value", equalTo(0)); .body("tasks[0].database[1].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("tasks[0].database[1].metrics.TotalNumberOfEventsSeen", equalTo("0"));
} }
public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) { public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) {

View File

@ -5,8 +5,19 @@
*/ */
package io.debezium.rest; package io.debezium.rest;
import java.lang.management.ManagementFactory;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
@ -14,30 +25,63 @@
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Connector;
import org.json.simple.JSONObject; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.rest.jolokia.JolokiaAttributes; import io.debezium.rest.metrics.MetricsDescriptor;
import io.debezium.rest.jolokia.JolokiaClient;
public interface MetricsResource { public interface MetricsResource {
Connector getConnector(); Logger LOGGER = LoggerFactory.getLogger(MetricsResource.class);
String getAttributesFilePath();
List<JSONObject> getMetrics(String connectorName, JolokiaClient client, List<String> attributes);
String CONNECTOR_METRICS_ENDPOINT = "/connectors/{connector-name}/metrics"; String CONNECTOR_METRICS_ENDPOINT = "/connectors/{connector-name}/metrics";
Connector getConnector();
MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer)
throws MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, ReflectionException, AttributeNotFoundException, MBeanException;
default ObjectName getObjectName(String connector, String context, String serverName) throws MalformedObjectNameException {
return new ObjectName(String.format("debezium.%s:type=connector-metrics,context=%s,server=%s", connector, context, serverName));
}
default ObjectName getObjectNameWithTask(String connector, String context, String serverName, String task) throws MalformedObjectNameException {
return new ObjectName(
String.format("debezium.%s:type=connector-metrics,context=%s,server=%s,task=%s", connector, context, serverName, task));
}
default ObjectName getObjectNameWithDatabase(String connector, String context, String serverName, String task, String databaseName)
throws MalformedObjectNameException {
return new ObjectName(
String.format("debezium.%s:type=connector-metrics,context=%s,server=%s,task=%s,database=%s", connector, context, serverName, task, databaseName));
}
default Map<String, String> getAttributes(List<String> attributes, ObjectName objectName, String connectorName, MBeanServer mBeanServer) {
return attributes.stream()
.collect(Collectors.toMap(
attribute -> attribute,
attribute -> getAttributeValue(attribute, objectName, connectorName, mBeanServer)));
}
default String getAttributeValue(String attribute, ObjectName objectName, String connectorName, MBeanServer mBeanServer) {
try {
return mBeanServer.getAttribute(objectName, attribute).toString();
}
catch (Exception e) {
throw new RuntimeException("Failed to get attribute " + attribute + " for connector " + connectorName + e);
}
}
@GET @GET
@Path(CONNECTOR_METRICS_ENDPOINT) @Path(CONNECTOR_METRICS_ENDPOINT)
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
default List<JSONObject> getConnectorMetrics(@PathParam("connector-name") String connectorName) { default MetricsDescriptor getConnectorMetrics(@PathParam("connector-name") String connectorName)
throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException {
if (getConnector() == null) { if (getConnector() == null) {
throw new RuntimeException("Unable to fetch metrics for connector " + connectorName + " as the connector is not available."); throw new RuntimeException("Unable to fetch metrics for connector " + connectorName + " as the connector is not available.");
} }
JolokiaClient jolokiaClient = new JolokiaClient();
JolokiaAttributes jolokiaAttributes = new JolokiaAttributes(getAttributesFilePath()); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
List<String> attributes = jolokiaAttributes.getAttributeNames(); return getMetrics(connectorName, mBeanServer);
return getMetrics(connectorName, jolokiaClient, attributes);
} }
} }

View File

@ -1,42 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.jolokia;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class JolokiaAttributes {
private List<String> attributeNames = new ArrayList<>();
public JolokiaAttributes(String attributesFilePath) {
try (InputStream stream = getClass().getClassLoader().getResourceAsStream(attributesFilePath)) {
if (stream != null) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String attribute;
while ((attribute = reader.readLine()) != null) {
attributeNames.add(attribute);
}
}
}
else {
attributeNames = Collections.emptyList();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public List<String> getAttributeNames() {
return attributeNames;
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.jolokia;
import java.util.ArrayList;
import java.util.List;
import javax.management.MalformedObjectNameException;
import org.jolokia.client.BasicAuthenticator;
import org.jolokia.client.J4pClient;
import org.jolokia.client.exception.J4pException;
import org.jolokia.client.request.J4pReadRequest;
import org.jolokia.client.request.J4pReadResponse;
import org.json.simple.JSONObject;
public class JolokiaClient {
public static final Integer DEFAULT_JOLOKIA_PORT = 8778;
public static final String DEFAULT_JOLOKIA_URL = "http://localhost:" + DEFAULT_JOLOKIA_PORT + "/jolokia";
public List<JSONObject> getConnectorMetrics(String type, String server, Integer taskMax, List<String> attributes) {
List<JSONObject> metrics = new ArrayList<>();
try {
J4pClient client = createJolokiaClient();
String baseMbeanName = String.format("debezium.%s:type=connector-metrics,context=streaming,server=%s", type, server);
if ("sql_server".equals(type) || "mongodb".equals(type)) {
for (int task = 0; task < Math.max(1, taskMax); task++) {
String mbeanName = String.format("%s,task=%s", baseMbeanName, task);
addMetrics(client, metrics, mbeanName, attributes);
}
}
else {
addMetrics(client, metrics, baseMbeanName, attributes);
}
}
catch (J4pException | MalformedObjectNameException e) {
throw new RuntimeException(e);
}
return metrics;
}
private void addMetrics(J4pClient client, List<JSONObject> metrics, String mbeanName, List<String> attributes) throws J4pException, MalformedObjectNameException {
for (String attribute : attributes) {
J4pReadRequest request = new J4pReadRequest(mbeanName, attribute);
J4pReadResponse response = client.execute(request);
metrics.add(response.asJSONObject());
}
}
private J4pClient createJolokiaClient() {
return J4pClient.url(JolokiaClient.DEFAULT_JOLOKIA_URL)
.authenticator(new BasicAuthenticator().preemptive())
.connectionTimeout(3000)
.build();
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.metrics;
import java.util.List;
/**
* Class containing the attributes names for the metrics exposed via REST API
*/
public class MetricsAttributes {
private static final List<String> CONNECTION_ATTRIBUTES = List.of("Connected");
private static final List<String> CONNECTOR_ATTRIBUTES = List.of("MilliSecondsSinceLastEvent", "TotalNumberOfEventsSeen");
public static List<String> getConnectionAttributes() {
return CONNECTION_ATTRIBUTES;
}
public static List<String> getConnectorAttributes() {
return CONNECTOR_ATTRIBUTES;
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.metrics;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Base class for JSON payloads describing connector metrics
*
* @author Anisha Mohanty
*/
public class MetricsDescriptor {
@JsonProperty("name")
private String name;
@JsonProperty("tasks.max")
private String tasksMax;
@JsonProperty("connector")
private Connector connector;
@JsonProperty("tasks")
private List<Task> tasks;
public MetricsDescriptor(String name, String tasksMax, Connector connector, List<Task> tasks) {
this.name = name;
this.tasksMax = tasksMax;
this.connector = connector;
this.tasks = tasks;
}
public static class Connector {
@JsonProperty()
private Map<String, String> metrics;
public Connector(Map<String, String> metrics) {
this.metrics = metrics;
}
}
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public static class Task {
@JsonProperty("id")
private int id;
@JsonProperty("database")
private List<Database> databases;
public Task(int id, List<Database> databases) {
this.id = id;
this.databases = databases;
}
}
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public static class Database {
@JsonProperty("name")
private String name;
@JsonProperty()
private Map<String, String> metrics;
public Database(String name, Map<String, String> metrics) {
this.name = name;
this.metrics = metrics;
}
}
}