DBZ-6764 Connector metrics implementation for REST extension

This commit is contained in:
ani-sha 2023-11-24 12:56:34 +05:30 committed by Jiri Pechanec
parent 5841eacb79
commit a12b1a9872
9 changed files with 128 additions and 0 deletions

View File

@ -0,0 +1,3 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -0,0 +1,3 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -0,0 +1,3 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -0,0 +1,3 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -0,0 +1,3 @@
Connected
MilliSecondsSinceLastEvent
TotalNumberOfEventsSeen

View File

@ -48,6 +48,14 @@
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- Jolokia Integration -->
<!-- Temporary workaround for implementing metrics for REST endpoint, until the implementation is switched to MBeanServer -->
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-client-java</artifactId>
<version>${version.jolokia.client}</version>
</dependency>
<!-- OpenTelemetry Integration -->
<dependency>
<groupId>io.opentelemetry</groupId>

View File

@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.jolokia;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class JolokiaAttributes {
private List<String> attributeNames = new ArrayList<>();
public JolokiaAttributes(String attributesFilePath) {
try (InputStream stream = getClass().getClassLoader().getResourceAsStream(attributesFilePath)) {
if (stream != null) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String attribute;
while ((attribute = reader.readLine()) != null) {
attributeNames.add(attribute);
}
}
}
else {
attributeNames = Collections.emptyList();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public List<String> getAttributeNames() {
return attributeNames;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.jolokia;
import java.util.ArrayList;
import java.util.List;
import javax.management.MalformedObjectNameException;
import org.jolokia.client.BasicAuthenticator;
import org.jolokia.client.J4pClient;
import org.jolokia.client.exception.J4pException;
import org.jolokia.client.request.J4pReadRequest;
import org.jolokia.client.request.J4pReadResponse;
import org.json.simple.JSONObject;
public class JolokiaClient {
public static final Integer DEFAULT_JOLOKIA_PORT = 8778;
public static final String DEFAULT_JOLOKIA_URL = "http://localhost:" + DEFAULT_JOLOKIA_PORT + "/jolokia";
public List<JSONObject> getConnectorMetrics(String type, String server, Integer taskMax, List<String> attributes) {
List<JSONObject> metrics = new ArrayList<>();
try {
J4pClient client = createJolokiaClient();
String baseMbeanName = String.format("debezium.%s:type=connector-metrics,context=streaming,server=%s", type, server);
if ("sql_server".equals(type) || "mongodb".equals(type)) {
for (int task = 0; task < Math.max(1, taskMax); task++) {
String mbeanName = String.format("%s,task=%s", baseMbeanName, task);
addMetrics(client, metrics, mbeanName, attributes);
}
}
else {
addMetrics(client, metrics, baseMbeanName, attributes);
}
}
catch (J4pException | MalformedObjectNameException e) {
throw new RuntimeException(e);
}
return metrics;
}
private void addMetrics(J4pClient client, List<JSONObject> metrics, String mbeanName, List<String> attributes) throws J4pException, MalformedObjectNameException {
for (String attribute : attributes) {
J4pReadRequest request = new J4pReadRequest(mbeanName, attribute);
J4pReadResponse response = client.execute(request);
metrics.add(response.asJSONObject());
}
}
private J4pClient createJolokiaClient() {
return J4pClient.url(JolokiaClient.DEFAULT_JOLOKIA_URL)
.authenticator(new BasicAuthenticator().preemptive())
.connectionTimeout(3000)
.build();
}
}

View File

@ -155,6 +155,9 @@
<!-- MySQL server image name -->
<mysql.server.image.source>container-registry.oracle.com/mysql/community-server</mysql.server.image.source>
<!-- Jolokia -->
<version.jolokia.client>1.7.2</version.jolokia.client>
</properties>
<modules>