From 2555a1cee2f3e62a1386d70f3401dc8cc287e264 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Mon, 11 Dec 2023 12:33:45 +0530 Subject: [PATCH] DBZ-6764 DBZ-7178 DBZ-7177 Use MBeanServer to query metrics for REST endpoint and create new metrics descriptor --- .../DebeziumMongoDbConnectorResource.java | 34 +++++--- .../resources/META-INF/mongodb-attributes.txt | 3 - .../DebeziumMongoDbConnectorResourceIT.java | 13 ++-- .../rest/DebeziumMySqlConnectorResource.java | 30 +++++--- .../resources/META-INF/mysql-attributes.txt | 3 - .../DebeziumMySqlConnectorResourceIT.java | 12 ++- .../rest/DebeziumOracleConnectorResource.java | 28 ++++--- .../resources/META-INF/oracle-attributes.txt | 3 - .../DebeziumOracleConnectorResourceIT.java | 12 ++- .../DebeziumPostgresConnectorResource.java | 31 +++++--- .../META-INF/postgres-attributes.txt | 3 - .../DebeziumPostgresConnectorResourceIT.java | 13 ++-- .../DebeziumSqlServerConnectorResource.java | 43 ++++++++--- .../META-INF/sqlserver-attributes.txt | 3 - .../DebeziumSqlServerConnectorResourceIT.java | 16 ++-- .../io/debezium/rest/MetricsResource.java | 70 +++++++++++++---- .../rest/jolokia/JolokiaAttributes.java | 42 ---------- .../debezium/rest/jolokia/JolokiaClient.java | 60 --------------- .../rest/metrics/MetricsAttributes.java | 24 ++++++ .../rest/metrics/MetricsDescriptor.java | 77 +++++++++++++++++++ 20 files changed, 305 insertions(+), 215 deletions(-) delete mode 100644 debezium-connector-mongodb/src/main/resources/META-INF/mongodb-attributes.txt delete mode 100644 debezium-connector-mysql/src/main/resources/META-INF/mysql-attributes.txt delete mode 100644 debezium-connector-oracle/src/main/resources/META-INF/oracle-attributes.txt delete mode 100644 debezium-connector-postgres/src/main/resources/META-INF/postgres-attributes.txt delete mode 100644 debezium-connector-sqlserver/src/main/resources/META-INF/sqlserver-attributes.txt delete mode 100644 debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaAttributes.java delete mode 100644 debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaClient.java create mode 100644 debezium-core/src/main/java/io/debezium/rest/metrics/MetricsAttributes.java create mode 100644 debezium-core/src/main/java/io/debezium/rest/metrics/MetricsDescriptor.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java index 8e7c8bf99..4d7346fd6 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java @@ -5,9 +5,14 @@ */ package io.debezium.connector.mongodb.rest; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -15,7 +20,6 @@ import javax.ws.rs.core.MediaType; import org.apache.kafka.connect.health.ConnectClusterState; -import org.json.simple.JSONObject; import io.debezium.config.Configuration; import io.debezium.connector.mongodb.Module; @@ -24,7 +28,8 @@ import io.debezium.rest.FilterValidationResource; import io.debezium.rest.MetricsResource; import io.debezium.rest.SchemaResource; -import io.debezium.rest.jolokia.JolokiaClient; +import io.debezium.rest.metrics.MetricsAttributes; +import io.debezium.rest.metrics.MetricsDescriptor; import io.debezium.rest.model.DataCollection; /** @@ -57,15 +62,24 @@ public MongoDbConnector getConnector() { } @Override - public String getAttributesFilePath() { - return "META-INF/mongodb-attributes.txt"; - } + public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) + throws MalformedObjectNameException { + Map connectorConfig = connectClusterState.connectorConfig(connectorName); + String serverName = connectorConfig.get("topic.prefix"); + String tasksMax = connectorConfig.get("tasks.max"); - @Override - public List getMetrics(String connectorName, JolokiaClient client, List attributes) { - String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); - String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); - return client.getConnectorMetrics("mongodb", serverName, Integer.valueOf(task), attributes); + MetricsDescriptor.Connector connector = null; + List tasks = new ArrayList<>(); + + for (int task = 0; task < Integer.parseInt(tasksMax); task++) { + ObjectName objectName = getObjectNameWithTask("mongodb", "streaming", serverName, String.valueOf(task)); + Map connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, mBeanServer); + Map connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName, mBeanServer); + + connector = new MetricsDescriptor.Connector(connectionAttributes); + tasks.add(new MetricsDescriptor.Task(task, List.of(new MetricsDescriptor.Database("", connectorAttributes)))); + } + return new MetricsDescriptor(connectorName, tasksMax, connector, tasks); } @Override diff --git a/debezium-connector-mongodb/src/main/resources/META-INF/mongodb-attributes.txt b/debezium-connector-mongodb/src/main/resources/META-INF/mongodb-attributes.txt deleted file mode 100644 index a9f39e8f7..000000000 --- a/debezium-connector-mongodb/src/main/resources/META-INF/mongodb-attributes.txt +++ /dev/null @@ -1,3 +0,0 @@ -Connected -MilliSecondsSinceLastEvent -TotalNumberOfEventsSeen \ No newline at end of file diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java index 0b1dd07e2..d1b9d574d 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java @@ -223,13 +223,12 @@ public void testMetricsEndpoint() { .get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .then().log().all() .statusCode(200) - .assertThat().body("size()", is(3)) - .body("[0].request.attribute", is("Connected")) - .body("[0].value", equalTo(true)) - .body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) - .body("[1].value", equalTo(-1)) - .body("[2].request.attribute", is("TotalNumberOfEventsSeen")) - .body("[2].value", equalTo(0)); + .body("name", equalTo(connectorName)) + .body("connector.metrics.Connected", equalTo("true")) + .body("tasks[0].id", equalTo(0)) + .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1")) + .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0")); + } public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java index cc55546f6..554911d3c 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java @@ -6,8 +6,12 @@ package io.debezium.connector.mysql.rest; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -15,7 +19,6 @@ import javax.ws.rs.core.MediaType; import org.apache.kafka.connect.health.ConnectClusterState; -import org.json.simple.JSONObject; import io.debezium.config.Configuration; import io.debezium.connector.mysql.Module; @@ -24,7 +27,8 @@ import io.debezium.rest.FilterValidationResource; import io.debezium.rest.MetricsResource; import io.debezium.rest.SchemaResource; -import io.debezium.rest.jolokia.JolokiaClient; +import io.debezium.rest.metrics.MetricsAttributes; +import io.debezium.rest.metrics.MetricsDescriptor; import io.debezium.rest.model.DataCollection; /** @@ -56,15 +60,21 @@ public MySqlConnector getConnector() { } @Override - public String getAttributesFilePath() { - return "META-INF/mysql-attributes.txt"; - } + public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException { + Map connectorConfig = connectClusterState.connectorConfig(connectorName); + String serverName = connectorConfig.get("topic.prefix"); + String tasksMax = connectorConfig.get("tasks.max"); - @Override - public List getMetrics(String connectorName, JolokiaClient client, List attributes) { - String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); - String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); - return client.getConnectorMetrics("mysql", serverName, Integer.valueOf(task), attributes); + ObjectName objectName = getObjectName("mysql", "streaming", serverName); + Map connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, + mBeanServer); + Map connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName, + mBeanServer); + + MetricsDescriptor.Connector connector = new MetricsDescriptor.Connector(connectionAttributes); + List tasks = List.of(new MetricsDescriptor.Task(0, List.of(new MetricsDescriptor.Database("", connectorAttributes)))); + + return new MetricsDescriptor(connectorName, tasksMax, connector, tasks); } @GET diff --git a/debezium-connector-mysql/src/main/resources/META-INF/mysql-attributes.txt b/debezium-connector-mysql/src/main/resources/META-INF/mysql-attributes.txt deleted file mode 100644 index a9f39e8f7..000000000 --- a/debezium-connector-mysql/src/main/resources/META-INF/mysql-attributes.txt +++ /dev/null @@ -1,3 +0,0 @@ -Connected -MilliSecondsSinceLastEvent -TotalNumberOfEventsSeen \ No newline at end of file diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java index b8415fee1..dff559c48 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java @@ -224,13 +224,11 @@ public void testMetricsEndpoint() { .get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .then().log().all() .statusCode(200) - .assertThat().body("size()", is(3)) - .body("[0].request.attribute", is("Connected")) - .body("[0].value", equalTo(true)) - .body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) - .body("[1].value", equalTo(0)) - .body("[2].request.attribute", is("TotalNumberOfEventsSeen")) - .body("[2].value", equalTo(14234)); + .body("name", equalTo(connectorName)) + .body("connector.metrics.Connected", equalTo("true")) + .body("tasks[0].id", equalTo(0)) + .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1")) + .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0")); } public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java index 59455e92c..92c44713c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java @@ -6,8 +6,12 @@ package io.debezium.connector.oracle.rest; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -15,7 +19,6 @@ import javax.ws.rs.core.MediaType; import org.apache.kafka.connect.health.ConnectClusterState; -import org.json.simple.JSONObject; import io.debezium.config.Configuration; import io.debezium.connector.oracle.Module; @@ -24,7 +27,8 @@ import io.debezium.rest.FilterValidationResource; import io.debezium.rest.MetricsResource; import io.debezium.rest.SchemaResource; -import io.debezium.rest.jolokia.JolokiaClient; +import io.debezium.rest.metrics.MetricsAttributes; +import io.debezium.rest.metrics.MetricsDescriptor; import io.debezium.rest.model.DataCollection; /** @@ -56,16 +60,20 @@ public OracleConnector getConnector() { } @Override - public String getAttributesFilePath() { - return "META-INF/oracle-attributes.txt"; - } - - @Override - public List getMetrics(String connectorName, JolokiaClient client, List attributes) { + public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException { String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); - String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); - return client.getConnectorMetrics("oracle", serverName, Integer.valueOf(task), attributes); + String tasksMax = connectClusterState.connectorConfig(connectorName).get("tasks.max"); + ObjectName objectName = getObjectName("oracle", "streaming", serverName); + Map connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, + mBeanServer); + Map connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName, + mBeanServer); + + MetricsDescriptor.Connector connector = new MetricsDescriptor.Connector(connectionAttributes); + List tasks = List.of(new MetricsDescriptor.Task(0, List.of(new MetricsDescriptor.Database("", connectorAttributes)))); + + return new MetricsDescriptor(connectorName, tasksMax, connector, tasks); } @GET diff --git a/debezium-connector-oracle/src/main/resources/META-INF/oracle-attributes.txt b/debezium-connector-oracle/src/main/resources/META-INF/oracle-attributes.txt deleted file mode 100644 index a9f39e8f7..000000000 --- a/debezium-connector-oracle/src/main/resources/META-INF/oracle-attributes.txt +++ /dev/null @@ -1,3 +0,0 @@ -Connected -MilliSecondsSinceLastEvent -TotalNumberOfEventsSeen \ No newline at end of file diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java index 63fa476b2..96fc472b9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java @@ -197,13 +197,11 @@ public void testMetricsEndpoint() { .get(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .then().log().all() .statusCode(200) - .assertThat().body("size()", is(3)) - .body("[0].request.attribute", is("Connected")) - .body("[0].value", equalTo(true)) - .body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) - .body("[1].value", equalTo(-1)) - .body("[2].request.attribute", is("TotalNumberOfEventsSeen")) - .body("[2].value", equalTo(0)); + .body("name", equalTo(connectorName)) + .body("connector.metrics.Connected", equalTo("true")) + .body("tasks[0].id", equalTo(0)) + .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1")) + .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0")); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java index ec10cec2a..362d63c76 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java @@ -6,8 +6,13 @@ package io.debezium.connector.postgresql.rest; import java.util.List; +import java.util.Map; + import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -15,7 +20,6 @@ import javax.ws.rs.core.MediaType; import org.apache.kafka.connect.health.ConnectClusterState; -import org.json.simple.JSONObject; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.Module; @@ -24,7 +28,8 @@ import io.debezium.rest.FilterValidationResource; import io.debezium.rest.MetricsResource; import io.debezium.rest.SchemaResource; -import io.debezium.rest.jolokia.JolokiaClient; +import io.debezium.rest.metrics.MetricsAttributes; +import io.debezium.rest.metrics.MetricsDescriptor; import io.debezium.rest.model.DataCollection; /** @@ -57,15 +62,21 @@ public PostgresConnector getConnector() { } @Override - public String getAttributesFilePath() { - return "META-INF/postgres-attributes.txt"; - } + public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) throws MalformedObjectNameException { + Map connectorConfig = connectClusterState.connectorConfig(connectorName); + String serverName = connectorConfig.get("topic.prefix"); + String tasksMax = connectorConfig.get("tasks.max"); - @Override - public List getMetrics(String connectorName, JolokiaClient client, List attributes) { - String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); - String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); - return client.getConnectorMetrics("postgres", serverName, Integer.valueOf(task), attributes); + ObjectName objectName = getObjectName("postgres", "streaming", serverName); + Map connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, + mBeanServer); + Map connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectName, connectorName, + mBeanServer); + + MetricsDescriptor.Connector connector = new MetricsDescriptor.Connector(connectionAttributes); + List tasks = List.of(new MetricsDescriptor.Task(0, List.of(new MetricsDescriptor.Database("", connectorAttributes)))); + + return new MetricsDescriptor(connectorName, tasksMax, connector, tasks); } public String getSchemaFilePath() { diff --git a/debezium-connector-postgres/src/main/resources/META-INF/postgres-attributes.txt b/debezium-connector-postgres/src/main/resources/META-INF/postgres-attributes.txt deleted file mode 100644 index a9f39e8f7..000000000 --- a/debezium-connector-postgres/src/main/resources/META-INF/postgres-attributes.txt +++ /dev/null @@ -1,3 +0,0 @@ -Connected -MilliSecondsSinceLastEvent -TotalNumberOfEventsSeen \ No newline at end of file diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java index 7444cab48..dbd9bd3a3 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java @@ -222,13 +222,12 @@ public void testMetricsEndpoint() { .get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .then().log().all() .statusCode(200) - .assertThat().body("size()", is(3)) - .body("[0].request.attribute", is("Connected")) - .body("[0].value", equalTo(true)) - .body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) - .body("[1].value", equalTo(-1)) - .body("[2].request.attribute", is("TotalNumberOfEventsSeen")) - .body("[2].value", equalTo(0)); + .body("name", equalTo(connectorName)) + .body("connector.metrics.Connected", equalTo("true")) + .body("tasks[0].id", equalTo(0)) + .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1")) + .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0")); + } private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java index 9c908b4a4..548fb3175 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java @@ -6,8 +6,13 @@ package io.debezium.connector.sqlserver.rest; import java.util.List; +import java.util.ArrayList; +import java.util.Map; import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -15,7 +20,6 @@ import javax.ws.rs.core.MediaType; import org.apache.kafka.connect.health.ConnectClusterState; -import org.json.simple.JSONObject; import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.Module; @@ -24,7 +28,8 @@ import io.debezium.rest.FilterValidationResource; import io.debezium.rest.MetricsResource; import io.debezium.rest.SchemaResource; -import io.debezium.rest.jolokia.JolokiaClient; +import io.debezium.rest.metrics.MetricsAttributes; +import io.debezium.rest.metrics.MetricsDescriptor; import io.debezium.rest.model.DataCollection; /** @@ -57,15 +62,33 @@ public SqlServerConnector getConnector() { } @Override - public String getAttributesFilePath() { - return "META-INF/sqlserver-attributes.txt"; - } + public MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) + throws MalformedObjectNameException { + Map connectorConfig = connectClusterState.connectorConfig(connectorName); + String serverName = connectorConfig.get("topic.prefix"); + String tasksMax = connectorConfig.get("tasks.max"); + String databaseNames = connectorConfig.get("database.names"); + String[] databaseNamesArray = databaseNames.split(","); - @Override - public List getMetrics(String connectorName, JolokiaClient client, List attributes) { - String serverName = connectClusterState.connectorConfig(connectorName).get("topic.prefix"); - String task = connectClusterState.connectorConfig(connectorName).get("tasks.max"); - return client.getConnectorMetrics("sql_server", serverName, Integer.valueOf(task), attributes); + MetricsDescriptor.Connector connector = null; + List tasks = new ArrayList<>(); + + for (int task = 0; task < Integer.parseInt(tasksMax); task++) { + ObjectName objectName = getObjectNameWithTask("sql_server", "streaming", serverName, String.valueOf(task)); + Map connectionAttributes = getAttributes(MetricsAttributes.getConnectionAttributes(), objectName, connectorName, mBeanServer); + + connector = new MetricsDescriptor.Connector(connectionAttributes); + List databases = new ArrayList<>(); + + for (String databaseName : databaseNamesArray) { + ObjectName objectNameWithDatabase = getObjectNameWithDatabase("sql_server", "streaming", serverName, String.valueOf(task), databaseName); + Map connectorAttributes = getAttributes(MetricsAttributes.getConnectorAttributes(), objectNameWithDatabase, connectorName, mBeanServer); + databases.add(new MetricsDescriptor.Database(databaseName, connectorAttributes)); + } + tasks.add(new MetricsDescriptor.Task(task, databases)); + } + + return new MetricsDescriptor(connectorName, tasksMax, connector, tasks); } @Override diff --git a/debezium-connector-sqlserver/src/main/resources/META-INF/sqlserver-attributes.txt b/debezium-connector-sqlserver/src/main/resources/META-INF/sqlserver-attributes.txt deleted file mode 100644 index a9f39e8f7..000000000 --- a/debezium-connector-sqlserver/src/main/resources/META-INF/sqlserver-attributes.txt +++ /dev/null @@ -1,3 +0,0 @@ -Connected -MilliSecondsSinceLastEvent -TotalNumberOfEventsSeen \ No newline at end of file diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java index 17d38a9da..7415ef6b6 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java @@ -204,13 +204,15 @@ public void testMetricsEndpoint() { .get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName) .then().log().all() .statusCode(200) - .assertThat().body("size()", is(3)) - .body("[0].request.attribute", is("Connected")) - .body("[0].value", equalTo(true)) - .body("[1].request.attribute", is("MilliSecondsSinceLastEvent")) - .body("[1].value", equalTo(-1)) - .body("[2].request.attribute", is("TotalNumberOfEventsSeen")) - .body("[2].value", equalTo(0)); + .body("name", equalTo(connectorName)) + .body("connector.metrics.Connected", equalTo("true")) + .body("tasks[0].database[0].name", equalTo("testDB")) + .body("tasks[0].database[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1")) + .body("tasks[0].database[0].metrics.TotalNumberOfEventsSeen", equalTo("0")) + .body("tasks[0].database[1].name", equalTo("testDB2")) + .body("tasks[0].database[1].metrics.MilliSecondsSinceLastEvent", equalTo("-1")) + .body("tasks[0].database[1].metrics.TotalNumberOfEventsSeen", equalTo("0")); + } public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) { diff --git a/debezium-core/src/main/java/io/debezium/rest/MetricsResource.java b/debezium-core/src/main/java/io/debezium/rest/MetricsResource.java index 508e26b52..ed36963e2 100644 --- a/debezium-core/src/main/java/io/debezium/rest/MetricsResource.java +++ b/debezium-core/src/main/java/io/debezium/rest/MetricsResource.java @@ -5,8 +5,19 @@ */ package io.debezium.rest; +import java.lang.management.ManagementFactory; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.IntrospectionException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -14,30 +25,63 @@ import javax.ws.rs.core.MediaType; import org.apache.kafka.connect.connector.Connector; -import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import io.debezium.rest.jolokia.JolokiaAttributes; -import io.debezium.rest.jolokia.JolokiaClient; +import io.debezium.rest.metrics.MetricsDescriptor; public interface MetricsResource { - Connector getConnector(); - - String getAttributesFilePath(); - - List getMetrics(String connectorName, JolokiaClient client, List attributes); + Logger LOGGER = LoggerFactory.getLogger(MetricsResource.class); String CONNECTOR_METRICS_ENDPOINT = "/connectors/{connector-name}/metrics"; + Connector getConnector(); + + MetricsDescriptor getMetrics(String connectorName, MBeanServer mBeanServer) + throws MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, ReflectionException, AttributeNotFoundException, MBeanException; + + default ObjectName getObjectName(String connector, String context, String serverName) throws MalformedObjectNameException { + return new ObjectName(String.format("debezium.%s:type=connector-metrics,context=%s,server=%s", connector, context, serverName)); + } + + default ObjectName getObjectNameWithTask(String connector, String context, String serverName, String task) throws MalformedObjectNameException { + return new ObjectName( + String.format("debezium.%s:type=connector-metrics,context=%s,server=%s,task=%s", connector, context, serverName, task)); + } + + default ObjectName getObjectNameWithDatabase(String connector, String context, String serverName, String task, String databaseName) + throws MalformedObjectNameException { + return new ObjectName( + String.format("debezium.%s:type=connector-metrics,context=%s,server=%s,task=%s,database=%s", connector, context, serverName, task, databaseName)); + } + + default Map getAttributes(List attributes, ObjectName objectName, String connectorName, MBeanServer mBeanServer) { + return attributes.stream() + .collect(Collectors.toMap( + attribute -> attribute, + attribute -> getAttributeValue(attribute, objectName, connectorName, mBeanServer))); + } + + default String getAttributeValue(String attribute, ObjectName objectName, String connectorName, MBeanServer mBeanServer) { + try { + return mBeanServer.getAttribute(objectName, attribute).toString(); + } + catch (Exception e) { + throw new RuntimeException("Failed to get attribute " + attribute + " for connector " + connectorName + e); + } + } + @GET @Path(CONNECTOR_METRICS_ENDPOINT) @Produces(MediaType.APPLICATION_JSON) - default List getConnectorMetrics(@PathParam("connector-name") String connectorName) { + default MetricsDescriptor getConnectorMetrics(@PathParam("connector-name") String connectorName) + throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException { + if (getConnector() == null) { throw new RuntimeException("Unable to fetch metrics for connector " + connectorName + " as the connector is not available."); } - JolokiaClient jolokiaClient = new JolokiaClient(); - JolokiaAttributes jolokiaAttributes = new JolokiaAttributes(getAttributesFilePath()); - List attributes = jolokiaAttributes.getAttributeNames(); - return getMetrics(connectorName, jolokiaClient, attributes); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + return getMetrics(connectorName, mBeanServer); } } diff --git a/debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaAttributes.java b/debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaAttributes.java deleted file mode 100644 index 2cb5d6500..000000000 --- a/debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaAttributes.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.rest.jolokia; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class JolokiaAttributes { - private List attributeNames = new ArrayList<>(); - - public JolokiaAttributes(String attributesFilePath) { - try (InputStream stream = getClass().getClassLoader().getResourceAsStream(attributesFilePath)) { - if (stream != null) { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) { - String attribute; - while ((attribute = reader.readLine()) != null) { - attributeNames.add(attribute); - } - } - } - else { - attributeNames = Collections.emptyList(); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - - public List getAttributeNames() { - return attributeNames; - } -} diff --git a/debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaClient.java b/debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaClient.java deleted file mode 100644 index 0a23714d4..000000000 --- a/debezium-core/src/main/java/io/debezium/rest/jolokia/JolokiaClient.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.rest.jolokia; - -import java.util.ArrayList; -import java.util.List; - -import javax.management.MalformedObjectNameException; - -import org.jolokia.client.BasicAuthenticator; -import org.jolokia.client.J4pClient; -import org.jolokia.client.exception.J4pException; -import org.jolokia.client.request.J4pReadRequest; -import org.jolokia.client.request.J4pReadResponse; -import org.json.simple.JSONObject; - -public class JolokiaClient { - public static final Integer DEFAULT_JOLOKIA_PORT = 8778; - public static final String DEFAULT_JOLOKIA_URL = "http://localhost:" + DEFAULT_JOLOKIA_PORT + "/jolokia"; - - public List getConnectorMetrics(String type, String server, Integer taskMax, List attributes) { - List metrics = new ArrayList<>(); - try { - J4pClient client = createJolokiaClient(); - String baseMbeanName = String.format("debezium.%s:type=connector-metrics,context=streaming,server=%s", type, server); - - if ("sql_server".equals(type) || "mongodb".equals(type)) { - for (int task = 0; task < Math.max(1, taskMax); task++) { - String mbeanName = String.format("%s,task=%s", baseMbeanName, task); - addMetrics(client, metrics, mbeanName, attributes); - } - } - else { - addMetrics(client, metrics, baseMbeanName, attributes); - } - } - catch (J4pException | MalformedObjectNameException e) { - throw new RuntimeException(e); - } - return metrics; - } - - private void addMetrics(J4pClient client, List metrics, String mbeanName, List attributes) throws J4pException, MalformedObjectNameException { - for (String attribute : attributes) { - J4pReadRequest request = new J4pReadRequest(mbeanName, attribute); - J4pReadResponse response = client.execute(request); - metrics.add(response.asJSONObject()); - } - } - - private J4pClient createJolokiaClient() { - return J4pClient.url(JolokiaClient.DEFAULT_JOLOKIA_URL) - .authenticator(new BasicAuthenticator().preemptive()) - .connectionTimeout(3000) - .build(); - } -} diff --git a/debezium-core/src/main/java/io/debezium/rest/metrics/MetricsAttributes.java b/debezium-core/src/main/java/io/debezium/rest/metrics/MetricsAttributes.java new file mode 100644 index 000000000..7aeb9fdf7 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/metrics/MetricsAttributes.java @@ -0,0 +1,24 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest.metrics; + +import java.util.List; + +/** + * Class containing the attributes names for the metrics exposed via REST API + */ +public class MetricsAttributes { + private static final List CONNECTION_ATTRIBUTES = List.of("Connected"); + private static final List CONNECTOR_ATTRIBUTES = List.of("MilliSecondsSinceLastEvent", "TotalNumberOfEventsSeen"); + + public static List getConnectionAttributes() { + return CONNECTION_ATTRIBUTES; + } + + public static List getConnectorAttributes() { + return CONNECTOR_ATTRIBUTES; + } +} diff --git a/debezium-core/src/main/java/io/debezium/rest/metrics/MetricsDescriptor.java b/debezium-core/src/main/java/io/debezium/rest/metrics/MetricsDescriptor.java new file mode 100644 index 000000000..d0377a61c --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/metrics/MetricsDescriptor.java @@ -0,0 +1,77 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest.metrics; + +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Base class for JSON payloads describing connector metrics + * + * @author Anisha Mohanty + */ +public class MetricsDescriptor { + + @JsonProperty("name") + private String name; + + @JsonProperty("tasks.max") + private String tasksMax; + + @JsonProperty("connector") + private Connector connector; + + @JsonProperty("tasks") + private List tasks; + + public MetricsDescriptor(String name, String tasksMax, Connector connector, List tasks) { + this.name = name; + this.tasksMax = tasksMax; + this.connector = connector; + this.tasks = tasks; + } + + public static class Connector { + @JsonProperty() + private Map metrics; + + public Connector(Map metrics) { + this.metrics = metrics; + } + } + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public static class Task { + + @JsonProperty("id") + private int id; + + @JsonProperty("database") + private List databases; + + public Task(int id, List databases) { + this.id = id; + this.databases = databases; + } + } + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public static class Database { + @JsonProperty("name") + private String name; + + @JsonProperty() + private Map metrics; + + public Database(String name, Map metrics) { + this.name = name; + this.metrics = metrics; + } + } +}