From 213d4a42caa9c8e329546a5f608fe20e7c07197f Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 29 Jan 2019 15:31:32 +0100 Subject: [PATCH] DBZ-1040 Metrics for internal queue --- .../debezium/connector/mysql/AbstractReader.java | 14 ++++++++++++++ .../io/debezium/connector/mysql/BinlogReader.java | 2 +- .../connector/mysql/BinlogReaderMetrics.java | 5 +++-- .../debezium/connector/mysql/SnapshotReader.java | 2 +- .../connector/mysql/SnapshotReaderMetrics.java | 5 +++-- .../sqlserver/SqlServerConnectorTask.java | 2 +- .../debezium/connector/base/ChangeEventQueue.java | 14 +++++++++++++- .../connector/base/ChangeEventQueueMetrics.java | 12 ++++++++++++ .../pipeline/ChangeEventSourceCoordinator.java | 7 ++++--- .../metrics/ChangeEventSourceMetricsMXBean.java | 2 ++ .../io/debezium/pipeline/metrics/Metrics.java | 15 ++++++++++++++- .../metrics/SnapshotChangeEventSourceMetrics.java | 5 +++-- .../StreamingChangeEventSourceMetrics.java | 5 +++-- 13 files changed, 74 insertions(+), 16 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java index ed78c6e9b..94104e05d 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java @@ -22,6 +22,7 @@ import com.github.shyiko.mysql.binlog.network.ServerException; import io.debezium.config.ConfigurationDefaults; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.time.Temporals; import io.debezium.util.Clock; import io.debezium.util.Metronome; @@ -49,6 +50,7 @@ public abstract class AbstractReader implements Reader { private final Metronome metronome; private final AtomicReference uponCompletion = new AtomicReference<>(); private final Duration pollInterval; + protected final ChangeEventQueueMetrics changeEventQueueMetrics; private final HaltingPredicate acceptAndContinue; @@ -71,6 +73,18 @@ public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate ac this.pollInterval = context.getConnectorConfig().getPollInterval(); this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM); this.acceptAndContinue = acceptAndContinue == null? new AcceptAllPredicate() : acceptAndContinue; + this.changeEventQueueMetrics = new ChangeEventQueueMetrics() { + + @Override + public int totalCapacity() { + return context.getConnectorConfig().getMaxQueueSize(); + } + + @Override + public int remainingCapacity() { + return records.remainingCapacity(); + } + }; } @Override diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index 84973134d..3e1d59484 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -246,7 +246,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { client.setEventDeserializer(eventDeserializer); // Set up for JMX ... - metrics = new BinlogReaderMetrics(client, context, name); + metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics); heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(), context.getConnectorConfig().getLogicalName()); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java index d3d683425..0a8e28bab 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java @@ -11,6 +11,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientStatistics; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.pipeline.metrics.Metrics; import io.debezium.util.Collect; @@ -28,8 +29,8 @@ class BinlogReaderMetrics extends Metrics implements BinlogReaderMetricsMXBean { private final AtomicLong numberOfNotWellFormedTransactions = new AtomicLong(); private final AtomicLong numberOfLargeTransactions = new AtomicLong(); - public BinlogReaderMetrics(BinaryLogClient client, MySqlTaskContext taskContext, String name) { - super(taskContext, name); + public BinlogReaderMetrics(BinaryLogClient client, MySqlTaskContext taskContext, String name, ChangeEventQueueMetrics changeEventQueueMetrics) { + super(taskContext, name, changeEventQueueMetrics); this.client = client; this.stats = new BinaryLogClientStatistics(client); this.schema = taskContext.dbSchema(); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index fe8cb0f89..4acc69b80 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -78,7 +78,7 @@ public SnapshotReader(String name, MySqlTaskContext context) { this.includeData = context.snapshotMode().includeData(); this.snapshotLockingMode = context.getConnectorConfig().getSnapshotLockingMode(); recorder = this::recordRowAsRead; - metrics = new SnapshotReaderMetrics(context, context.dbSchema()); + metrics = new SnapshotReaderMetrics(context, context.dbSchema(), changeEventQueueMetrics); } /** diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java index 83faa33a6..4b02420ce 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; /** @@ -19,8 +20,8 @@ class SnapshotReaderMetrics extends SnapshotChangeEventSourceMetrics implements private final MySqlSchema schema; - public SnapshotReaderMetrics(MySqlTaskContext taskContext, MySqlSchema schema) { - super(taskContext); + public SnapshotReaderMetrics(MySqlTaskContext taskContext, MySqlSchema schema, ChangeEventQueueMetrics changeEventQueueMetrics) { + super(taskContext, changeEventQueueMetrics); this.schema = schema; } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 01c069be9..de1642f1a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -124,7 +124,7 @@ public void start(Configuration config) { dispatcher ); - coordinator.start(taskContext); + coordinator.start(taskContext, this.queue); } /** diff --git a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java index e9ea9cba3..db3312e05 100644 --- a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java +++ b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java @@ -51,12 +51,13 @@ * producers to the consumer, a custom type wrapping source records * may be used. */ -public class ChangeEventQueue { +public class ChangeEventQueue implements ChangeEventQueueMetrics { private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class); private final Duration pollInterval; private final int maxBatchSize; + private final int maxQueueSize; private final BlockingQueue queue; private final Metronome metronome; private final Supplier loggingContextSupplier; @@ -66,6 +67,7 @@ public class ChangeEventQueue { private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier loggingContextSupplier) { this.pollInterval = pollInterval; this.maxBatchSize = maxBatchSize; + this.maxQueueSize = maxQueueSize; this.queue = new LinkedBlockingDeque<>(maxQueueSize); this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM); this.loggingContextSupplier = loggingContextSupplier; @@ -168,4 +170,14 @@ private void throwProducerFailureIfPresent() { throw new ConnectException("An exception ocurred in the change event producer. This connector will be stopped.", producerFailure); } } + + @Override + public int totalCapacity() { + return maxQueueSize; + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } } diff --git a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java new file mode 100644 index 000000000..1198e3436 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java @@ -0,0 +1,12 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.base; + +public interface ChangeEventQueueMetrics { + + int totalCapacity(); + int remainingCapacity(); +} \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index 2e91594d7..13c83ff03 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; @@ -60,9 +61,9 @@ public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler e this.eventDispatcher = eventDispatcher; } - public synchronized void start(T taskContext) { - this.snapshotMetrics = new SnapshotChangeEventSourceMetrics(taskContext); - this.streamingMetrics = new StreamingChangeEventSourceMetrics(taskContext); + public synchronized void start(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics) { + this.snapshotMetrics = new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics); + this.streamingMetrics = new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics); running = true; // run the snapshot source on a separate thread so start() won't block diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java index 38055660b..905abf8b9 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java @@ -17,5 +17,7 @@ public interface ChangeEventSourceMetricsMXBean { long getTotalNumberOfEventsSeen(); long getNumberOfEventsSkipped(); String[] getMonitoredTables(); + int getQueueTotalCapacity(); + int getQueueRemainingCapacity(); void reset(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java index 8c742a893..0dc335523 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java @@ -16,6 +16,7 @@ import org.slf4j.Logger; import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.EventMetadataProvider; @@ -38,12 +39,14 @@ public abstract class Metrics implements DataChangeEventListener, ChangeEventSou private final String contextName; protected final Clock clock; protected final CdcSourceTaskContext taskContext; + private final ChangeEventQueueMetrics changeEventQueueMetrics; private volatile ObjectName name; - protected Metrics(T taskContext, String contextName) { + protected Metrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics) { this.contextName = contextName; this.taskContext = taskContext; this.clock = taskContext.getClock(); + this.changeEventQueueMetrics = changeEventQueueMetrics; } /** @@ -124,4 +127,14 @@ public void reset() { numberOfEventsSkipped.set(0); lastEvent = null; } + + @Override + public int getQueueTotalCapacity() { + return changeEventQueueMetrics.totalCapacity(); + } + + @Override + public int getQueueRemainingCapacity() { + return changeEventQueueMetrics.remainingCapacity(); + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java index 3187736df..b607692d0 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicLong; import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.relational.TableId; @@ -38,8 +39,8 @@ public class SnapshotChangeEventSourceMetrics extends Metrics implements Snapsho private final Set monitoredTables = Collections.synchronizedSet(new HashSet<>()); - public SnapshotChangeEventSourceMetrics(T taskContext) { - super(taskContext, "snapshot"); + public SnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics) { + super(taskContext, "snapshot", changeEventQueueMetrics); } @Override diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java index a80ae41d3..49ca4734c 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java @@ -14,6 +14,7 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.EventMetadataProvider; @@ -31,8 +32,8 @@ public class StreamingChangeEventSourceMetrics extends Metrics implements Stream private final AtomicReference> sourceEventPosition = new AtomicReference>(Collections.emptyMap()); private volatile String lastTransactionId; - public StreamingChangeEventSourceMetrics(T taskContext) { - super(taskContext, "streaming"); + public StreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics) { + super(taskContext, "streaming", changeEventQueueMetrics); } @Override