DBZ-6764 Fix waitForStreamingRunning/isStreamingRunning

This commit is contained in:
rkerner 2023-11-29 11:42:07 +01:00 committed by Jiri Pechanec
parent a12b1a9872
commit e06f96f339
7 changed files with 63 additions and 181 deletions

View File

@ -217,7 +217,7 @@ public void testMetricsEndpoint() throws InterruptedException {
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForStreamingRunning("mysql", "dbserver1");
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("mysql", config.asProperties().getProperty("topic.prefix"));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
@ -228,8 +228,8 @@ public void testMetricsEndpoint() throws InterruptedException {
.body("name", equalTo(connectorName))
.body("connector.metrics.Connected", equalTo("true"))
.body("tasks[0].id", equalTo(0))
.body("tasks[0].namespaces[0].metrics.MilliSecondsSinceLastEvent", equalTo("-1"))
.body("tasks[0].namespaces[0].metrics.TotalNumberOfEventsSeen", equalTo("0"));
.body("tasks[0].namespaces[0].metrics.MilliSecondsSinceLastEvent", equalTo("0"))
.body("tasks[0].namespaces[0].metrics.TotalNumberOfEventsSeen", equalTo("14234"));
}
public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) {

View File

@ -48,14 +48,6 @@
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- Jolokia Integration -->
<!-- Temporary workaround for implementing metrics for REST endpoint, until the implementation is switched to MBeanServer -->
<dependency>
<groupId>org.jolokia</groupId>
<artifactId>jolokia-client-java</artifactId>
<version>${version.jolokia.client}</version>
</dependency>
<!-- OpenTelemetry Integration -->
<dependency>
<groupId>io.opentelemetry</groupId>

View File

@ -1,42 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.jolokia;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class JolokiaAttributes {
private List<String> attributeNames = new ArrayList<>();
public JolokiaAttributes(String attributesFilePath) {
try (InputStream stream = getClass().getClassLoader().getResourceAsStream(attributesFilePath)) {
if (stream != null) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String attribute;
while ((attribute = reader.readLine()) != null) {
attributeNames.add(attribute);
}
}
}
else {
attributeNames = Collections.emptyList();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public List<String> getAttributeNames() {
return attributeNames;
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.jolokia;
import java.util.ArrayList;
import java.util.List;
import javax.management.MalformedObjectNameException;
import org.jolokia.client.BasicAuthenticator;
import org.jolokia.client.J4pClient;
import org.jolokia.client.exception.J4pException;
import org.jolokia.client.request.J4pReadRequest;
import org.jolokia.client.request.J4pReadResponse;
import org.json.simple.JSONObject;
public class JolokiaClient {
public static final Integer DEFAULT_JOLOKIA_PORT = 8778;
public static final String DEFAULT_JOLOKIA_URL = "http://localhost:" + DEFAULT_JOLOKIA_PORT + "/jolokia";
public List<JSONObject> getConnectorMetrics(String type, String server, Integer taskMax, List<String> attributes) {
List<JSONObject> metrics = new ArrayList<>();
try {
J4pClient client = createJolokiaClient();
String baseMbeanName = String.format("debezium.%s:type=connector-metrics,context=streaming,server=%s", type, server);
if ("sql_server".equals(type) || "mongodb".equals(type)) {
for (int task = 0; task < Math.max(1, taskMax); task++) {
String mbeanName = String.format("%s,task=%s", baseMbeanName, task);
addMetrics(client, metrics, mbeanName, attributes);
}
}
else {
addMetrics(client, metrics, baseMbeanName, attributes);
}
}
catch (J4pException | MalformedObjectNameException e) {
throw new RuntimeException(e);
}
return metrics;
}
private void addMetrics(J4pClient client, List<JSONObject> metrics, String mbeanName, List<String> attributes) throws J4pException, MalformedObjectNameException {
for (String attribute : attributes) {
J4pReadRequest request = new J4pReadRequest(mbeanName, attribute);
J4pReadResponse response = client.execute(request);
metrics.add(response.asJSONObject());
}
}
private J4pClient createJolokiaClient() {
return J4pClient.url(JolokiaClient.DEFAULT_JOLOKIA_URL)
.authenticator(new BasicAuthenticator().preemptive())
.connectionTimeout(3000)
.build();
}
}

View File

@ -13,6 +13,15 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.awaitility.Awaitility;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
@ -44,6 +53,7 @@ public class DebeziumContainer extends GenericContainer<DebeziumContainer> {
private static final String DEBEZIUM_NIGHTLY_TAG = "nightly";
private static final int KAFKA_CONNECT_PORT = 8083;
private static final Integer DEFAULT_JMX_PORT = 13333;
private static final Duration DEBEZIUM_CONTAINER_STARTUP_TIMEOUT = Duration.ofSeconds(waitTimeForRecords() * 30);
private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
@ -114,9 +124,16 @@ public DebeziumContainer enableApicurioConverters() {
return self();
}
public DebeziumContainer enableJolokia() {
withEnv("ENABLE_JOLOKIA", "true");
withExposedPorts(8778);
public DebeziumContainer enableJMX() {
return enableJMX(DEFAULT_JMX_PORT);
}
public DebeziumContainer enableJMX(Integer jmxPort) {
withEnv("JMXHOST", "localhost")
.withEnv("JMXPORT", String.valueOf(jmxPort))
.withEnv("JMXAUTH", "false")
.withEnv("JMXSSL", "false");
addFixedExposedPort(jmxPort, jmxPort);
return self();
}
@ -400,6 +417,45 @@ public void ensureConnectorConfigProperty(String connectorName, String propertyN
.until(() -> Objects.equals(expectedValue, getConnectorConfigProperty(connectorName, propertyName)));
}
public void waitForStreamingRunning(String connectorTypeId, String server) throws InterruptedException {
waitForStreamingRunning(connectorTypeId, server, "streaming");
}
public void waitForStreamingRunning(String connectorTypeId, String server, String contextName) {
waitForStreamingRunning(connectorTypeId, server, contextName, null);
}
public void waitForStreamingRunning(String connectorTypeId, String server, String contextName, String task) {
Awaitility.await()
.atMost(120, TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> isStreamingRunning(connectorTypeId, server, contextName, task));
}
public boolean isStreamingRunning(String connectorTypeId, String server, String contextName, String task) throws JMException {
MBeanServerConnection mBeanServerConnection;
try {
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + DEFAULT_JMX_PORT + "/jmxrmi");
try (JMXConnector connectorJmx = JMXConnectorFactory.connect(url, null)) {
mBeanServerConnection = connectorJmx.getMBeanServerConnection();
ObjectName streamingMetricsObjectName = task != null ? getStreamingMetricsObjectName(connectorTypeId, server, contextName, task)
: getStreamingMetricsObjectName(connectorTypeId, server, contextName);
return (boolean) mBeanServerConnection.getAttribute(streamingMetricsObjectName, "Connected");
}
}
catch (IOException e) {
throw new RuntimeException("Unable to connect to JMX service", e);
}
}
private static ObjectName getStreamingMetricsObjectName(String connector, String server, String context) throws MalformedObjectNameException {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server);
}
private static ObjectName getStreamingMetricsObjectName(String connector, String server, String context, String task) throws MalformedObjectNameException {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server + ",task=" + task);
}
public static ConnectorConfiguration getPostgresConnectorConfiguration(PostgreSQLContainer<?> postgresContainer, int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("topic.prefix", "dbserver" + id)

View File

@ -5,7 +5,6 @@
*/
package io.debezium.testing.testcontainers.testhelper;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
@ -13,15 +12,6 @@
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -185,15 +175,10 @@ public static void setupDebeziumContainer(String connectorVersion, String restEx
.withBuildArg("DEBEZIUM_VERSION", debeziumVersion))
.withEnv("ENABLE_DEBEZIUM_SCRIPTING", "true")
.withEnv("CONNECT_REST_EXTENSION_CLASSES", restExtensionClasses)
.withEnv("ENABLE_JOLOKIA", "true")
.withEnv("JMXHOST", "0.0.0.0")
.withEnv("JMXPORT", "3000")
.withEnv("JMXAUTH", "false")
.withEnv("JMXSSL", "false")
.withExposedPorts(8083, 8778, 3000)
.withNetwork(NETWORK)
.withKafka(KAFKA_CONTAINER.getNetwork(), KAFKA_HOSTNAME + ":9092")
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.enableJMX()
.dependsOn(KAFKA_CONTAINER);
}
@ -232,50 +217,4 @@ public static void waitForConnectorTaskStatus(String connectorName, int taskNumb
.atMost(120, TimeUnit.SECONDS)
.until(() -> RestExtensionTestInfrastructure.getDebeziumContainer().getConnectorTaskState(connectorName, taskNumber) == state);
}
public static void waitForStreamingRunning(String connector, String server) throws InterruptedException {
waitForStreamingRunning(connector, server, "streaming");
}
public static void waitForStreamingRunning(String connector, String server, String contextName) {
waitForStreamingRunning(connector, server, contextName, null);
}
public static void waitForStreamingRunning(String connector, String server, String contextName, String task) {
Awaitility.await()
.atMost(120, TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> isStreamingRunning(connector, server, contextName, task));
}
public static boolean isStreamingRunning(String connector, String server, String contextName, String task) throws IOException {
JMXServiceURL url;
JMXConnector connectorJmx;
MBeanServerConnection mBeanServerConnection;
try {
url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://0.0.0.0:" + getDebeziumContainer().getMappedPort(3000) + "/jmxrmi");
connectorJmx = JMXConnectorFactory.connect(url, null);
mBeanServerConnection = connectorJmx.getMBeanServerConnection();
}
catch (IOException e) {
throw new RuntimeException("Unable to connect to JMX service", e);
}
try {
ObjectName streamingMetricsObjectName = task != null ? getStreamingMetricsObjectName(connector, contextName, server, task)
: getStreamingMetricsObjectName(connector, contextName, server);
return (boolean) mBeanServerConnection.getAttribute(streamingMetricsObjectName, "Connected");
}
catch (JMException ignored) {
}
return false;
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context) throws MalformedObjectNameException {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server);
}
public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context, String task) throws MalformedObjectNameException {
return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server + ",task=" + task);
}
}

View File

@ -155,9 +155,6 @@
<!-- MySQL server image name -->
<mysql.server.image.source>container-registry.oracle.com/mysql/community-server</mysql.server.image.source>
<!-- Jolokia -->
<version.jolokia.client>1.7.2</version.jolokia.client>
</properties>
<modules>