diff --git a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumMetrics.java b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumMetrics.java index 0e515d45d..3fb578899 100644 --- a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumMetrics.java +++ b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumMetrics.java @@ -13,12 +13,10 @@ import javax.management.MBeanServer; import javax.management.ObjectName; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; -import io.debezium.config.CommonConnectorConfig; /** * Reads debezium source pipeline metrics. @@ -33,8 +31,6 @@ public class DebeziumMetrics { protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMetrics.class); public static final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); - @ConfigProperty(name = "debezium.source.max.queue.size", defaultValue = CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE + "") - int maxQueueSize; private ObjectName snapshotMetricsObjectName; private ObjectName streamingMetricsObjectName; @@ -77,7 +73,12 @@ public ObjectName getStreamingMetricsObjectName() { } public int maxQueueSize() { - return maxQueueSize; + try { + return (int) mbeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueTotalCapacity"); + } + catch (Exception e) { + throw new DebeziumException(e); + } } public boolean snapshotRunning() { @@ -108,7 +109,7 @@ public int streamingQueueRemainingCapacity() { } public int streamingQueueCurrentSize() { - return maxQueueSize - streamingQueueRemainingCapacity(); + return maxQueueSize() - streamingQueueRemainingCapacity(); } public long streamingMilliSecondsBehindSource() { diff --git a/debezium-server/debezium-server-core/src/test/java/io/debezium/server/DebeziumServerIT.java b/debezium-server/debezium-server-core/src/test/java/io/debezium/server/DebeziumServerIT.java index 6ced12134..148674ada 100644 --- a/debezium-server/debezium-server-core/src/test/java/io/debezium/server/DebeziumServerIT.java +++ b/debezium-server/debezium-server-core/src/test/java/io/debezium/server/DebeziumServerIT.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import io.debezium.config.CommonConnectorConfig; import io.debezium.server.events.ConnectorCompletedEvent; import io.debezium.server.events.ConnectorStartedEvent; import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; @@ -79,7 +80,8 @@ public void testDebeziumMetricsWithPostgres() { // snapshot process finished // and consuming events finished! return metrics.snapshotCompleted() - && metrics.streamingQueueCurrentSize() == 0; + && metrics.streamingQueueCurrentSize() == 0 + && metrics.maxQueueSize() == CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE; } catch (Exception e) { return false;