DBZ-5235 read Debezium metrics from consumer

This commit is contained in:
Ismail Simsek 2022-06-08 15:28:17 +02:00 committed by Jiri Pechanec
parent 5573bb51ed
commit d84559f147
2 changed files with 153 additions and 0 deletions

View File

@ -0,0 +1,133 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.lang.management.ManagementFactory;
import java.util.Objects;
import javax.enterprise.context.Dependent;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
/**
* Reads debezium source pipeline metrics.
* NOTE: calls for reading metrics should be made after debezium connector initialized,
* after connector registers metrics, otherwise it will throw `Debezium Mbean not found` error
*
* @author Ismail Simsek
*/
@Dependent
public class DebeziumMetrics {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class);
public static final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
@ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "")
int maxQueueSize;
private ObjectName snapshotMetricsObjectName;
private ObjectName streamingMetricsObjectName;
private static ObjectName getDebeziumMbean(String context) {
ObjectName debeziumMbean = null;
for (ObjectName mbean : mbeanServer.queryNames(null, null)) {
if (mbean.getCanonicalName().contains("debezium.")
&& mbean.getCanonicalName().contains("type=connector-metrics")
&& mbean.getCanonicalName().contains("context=" + context)) {
LOGGER.debug("Using {} MBean to get {} metrics", mbean, context);
debeziumMbean = mbean;
break;
}
}
Objects.requireNonNull(debeziumMbean, "Debezium MBean (context=" + context + ") not found!");
return debeziumMbean;
}
public ObjectName getSnapshotMetricsObjectName() {
if (snapshotMetricsObjectName == null) {
snapshotMetricsObjectName = getDebeziumMbean("snapshot");
}
return snapshotMetricsObjectName;
}
public ObjectName getStreamingMetricsObjectName() {
if (streamingMetricsObjectName == null) {
streamingMetricsObjectName = getDebeziumMbean("streaming");
}
return streamingMetricsObjectName;
}
public int maxQueueSize() {
return maxQueueSize;
}
public boolean snapshotRunning() {
try {
return (boolean) mbeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning");
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
public boolean snapshotCompleted() {
try {
return (boolean) mbeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted");
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
public int streamingQueueRemainingCapacity() {
try {
return (int) mbeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity");
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
public int streamingQueueCurrentSize() {
return maxQueueSize - streamingQueueRemainingCapacity();
}
public long streamingMilliSecondsBehindSource() {
try {
return (long) mbeanServer.getAttribute(getStreamingMetricsObjectName(), "MilliSecondsBehindSource");
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
public void logMetrics() {
LOGGER.info("Debezium Metrics: snapshotCompleted={} snapshotRunning={} "
+ "streamingQueueCurrentSize={} streamingQueueRemainingCapacity={} maxQueueSize={} streamingMilliSecondsBehindSource={}",
this.snapshotCompleted(),
this.snapshotRunning(),
this.streamingQueueCurrentSize(),
this.streamingQueueRemainingCapacity(),
this.maxQueueSize(),
this.streamingMilliSecondsBehindSource());
}
}

View File

@ -39,6 +39,9 @@ public class DebeziumServerIT {
@Inject
DebeziumServer server;
@Inject
DebeziumMetrics metrics;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}
@ -66,4 +69,21 @@ public void testPostgresWithJson() throws Exception {
Assertions.assertThat(((String) testConsumer.getValues().get(MESSAGE_COUNT - 1))).contains(
"\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}");
}
@Test
public void testDebeziumMetricsWithPostgres() {
Testing.Print.enable();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
try {
// snapshot process finished
// and consuming events finished!
return metrics.snapshotCompleted()
&& metrics.streamingQueueCurrentSize() == 0;
}
catch (Exception e) {
return false;
}
});
}
}