DBZ-5235 Use JMX to get queue size

This commit is contained in:
Jiri Pechanec 2022-07-12 16:38:27 +02:00
parent d84559f147
commit ad7e65fbac
2 changed files with 10 additions and 7 deletions

View File

@ -13,12 +13,10 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
/**
* Reads debezium source pipeline metrics.
@ -33,8 +31,6 @@ public class DebeziumMetrics {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class);
public static final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
@ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "")
int maxQueueSize;
private ObjectName snapshotMetricsObjectName;
private ObjectName streamingMetricsObjectName;
@ -77,7 +73,12 @@ public ObjectName getStreamingMetricsObjectName() {
}
public int maxQueueSize() {
return maxQueueSize;
try {
return (int) mbeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueTotalCapacity");
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
public boolean snapshotRunning() {
@ -108,7 +109,7 @@ public int streamingQueueRemainingCapacity() {
}
public int streamingQueueCurrentSize() {
return maxQueueSize - streamingQueueRemainingCapacity();
return maxQueueSize() - streamingQueueRemainingCapacity();
}
public long streamingMilliSecondsBehindSource() {

View File

@ -16,6 +16,7 @@
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
@ -79,7 +80,8 @@ public void testDebeziumMetricsWithPostgres() {
// snapshot process finished
// and consuming events finished!
return metrics.snapshotCompleted()
&& metrics.streamingQueueCurrentSize() == 0;
&& metrics.streamingQueueCurrentSize() == 0
&& metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE;
}
catch (Exception e) {
return false;