From 448330fd079692d9b1678991d9b9dab91df17bff Mon Sep 17 00:00:00 2001 From: Peter Goransson Date: Thu, 5 Jul 2018 14:22:38 -0400 Subject: [PATCH] DBZ-789 Expose more granular snapshot metrics via JMX --- .../io/debezium/connector/mysql/SnapshotReader.java | 2 ++ .../connector/mysql/SnapshotReaderMetrics.java | 13 +++++++++++++ .../mysql/SnapshotReaderMetricsMXBean.java | 3 +++ 3 files changed, 18 insertions(+) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index d922d64be..37e9ee7f7 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -567,6 +567,7 @@ protected void execute() { long stop = clock.currentTimeInMillis(); logger.info("Step {}: - {} of {} rows scanned from table '{}' after {}", stepNum, rowNum, rowCountStr, tableId, Strings.duration(stop - start)); + metrics.setRowsScanned(tableId.toString(), rowNum); } } @@ -575,6 +576,7 @@ protected void execute() { long stop = clock.currentTimeInMillis(); logger.info("Step {}: - Completed scanning a total of {} rows from table '{}' after {}", stepNum, rowNum, tableId, Strings.duration(stop - start)); + metrics.setRowsScanned(tableId.toString(), rowNum); } } catch (InterruptedException e) { Thread.interrupted(); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java index 1ca85130c..d5370946d 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetrics.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.mysql; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -24,6 +26,8 @@ class SnapshotReaderMetrics extends Metrics implements SnapshotReaderMetricsMXBe private final AtomicBoolean snapshotAborted = new AtomicBoolean(); private final AtomicLong startTime = new AtomicLong(); private final AtomicLong stopTime = new AtomicLong(); + private final ConcurrentMap rowsScanned = new ConcurrentHashMap(); + private final MySqlSchema schema; private final Clock clock; @@ -118,4 +122,13 @@ public void abortSnapshot() { public String[] getMonitoredTables() { return schema.monitoredTablesAsStringArray(); } + + public void setRowsScanned(String tableId, Long numRows) { + rowsScanned.put(tableId, numRows); + } + + @Override + public ConcurrentMap getRowsScanned() { + return rowsScanned; + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetricsMXBean.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetricsMXBean.java index f551564fc..930b1e533 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetricsMXBean.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReaderMetricsMXBean.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.mysql; +import java.util.Map; + /** * @author Randall Hauch */ @@ -17,4 +19,5 @@ public interface SnapshotReaderMetricsMXBean extends ReaderMetricsMXBean { boolean getSnapshotAborted(); boolean getSnapshotCompleted(); long getSnapshotDurationInSeconds(); + Map getRowsScanned(); }