From edec1c30908fcc6c92b3fb43431e9da143adf397 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 5 Dec 2018 09:02:54 +0100 Subject: [PATCH] DBZ-978 Added last event and captured tables metrics --- .../connector/mongodb/MongoDbTaskContext.java | 4 ++- .../connector/mysql/MySqlTaskContext.java | 4 ++- .../postgresql/PostgresTaskContext.java | 3 ++- .../common/CdcSourceTaskContext.java | 26 ++++++++++++++++++- .../ChangeEventSourceCoordinator.java | 4 +++ .../io/debezium/pipeline/EventDispatcher.java | 24 ++++++++++++++--- .../ChangeEventSourceMetricsMXBean.java | 1 + .../io/debezium/pipeline/metrics/Metrics.java | 25 +++++++++++++++--- .../SnapshotChangeEventSourceMetrics.java | 2 -- .../StreamingChangeEventSourceMetrics.java | 8 +++--- .../source/spi/DataChangeEventListener.java | 14 +++++++--- 11 files changed, 95 insertions(+), 20 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java index 0259feabd..3c11b3b94 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.mongodb; +import java.util.Collections; + import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.connector.common.CdcSourceTaskContext; @@ -28,7 +30,7 @@ public class MongoDbTaskContext extends CdcSourceTaskContext { * @param config the configuration */ public MongoDbTaskContext(Configuration config) { - super("MongoDB", config.getString(MongoDbConnectorConfig.LOGICAL_NAME)); + super("MongoDB", config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet); final String serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME); this.filters = new Filters(config); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index fb80759ae..8aa577bb9 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mysql; +import java.util.Collections; import java.util.Map; import java.util.function.Predicate; @@ -53,7 +54,8 @@ public MySqlTaskContext(Configuration config) { } public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) { - super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME)); + // MySQL now calculates JMX binlog reader metrics on its own + super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList); this.config = config; this.connectorConfig = new MySqlConnectorConfig(config); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index e38f9f6a1..e8d948b2c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -7,6 +7,7 @@ package io.debezium.connector.postgresql; import java.sql.SQLException; +import java.util.Collections; import io.debezium.annotation.ThreadSafe; import io.debezium.connector.common.CdcSourceTaskContext; @@ -29,7 +30,7 @@ public class PostgresTaskContext extends CdcSourceTaskContext { private final PostgresSchema schema; protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector topicSelector) { - super("Postgres", config.getLogicalName()); + super("Postgres", config.getLogicalName(), Collections::emptySet); this.config = config; this.topicSelector = topicSelector; diff --git a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java index aa6cb8e9d..8eb5aeb2f 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java @@ -5,11 +5,15 @@ */ package io.debezium.connector.common; +import java.util.Collection; +import java.util.function.Supplier; + import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.kafka.connect.source.SourceTask; +import io.debezium.schema.DataCollectionId; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; @@ -20,13 +24,17 @@ */ public class CdcSourceTaskContext { + private static final String[] EMPTY_CAPTURED_LIST = new String[0]; + private final String connectorType; private final String connectorName; private final Clock clock; + private final Supplier> collectionsSupplier; - public CdcSourceTaskContext(String connectorType, String connectorName) { + public CdcSourceTaskContext(String connectorType, String connectorName, Supplier> collectionsSupplier) { this.connectorType = connectorType; this.connectorName = connectorName; + this.collectionsSupplier = collectionsSupplier; this.clock = Clock.system(); } @@ -58,4 +66,20 @@ public Clock getClock() { public ObjectName metricName(String contextName) throws MalformedObjectNameException { return new ObjectName("debezium." + connectorType.toLowerCase() + ":type=connector-metrics,context=" + contextName + ",server=" + connectorName); } + + public String[] capturedDataCollections() { + if (collectionsSupplier == null) { + return EMPTY_CAPTURED_LIST; + } + final Collection collections = collectionsSupplier.get(); + if (collections == null) { + return EMPTY_CAPTURED_LIST; + } + String[] ret = new String[collections.size()]; + int i = 0; + for (DataCollectionId collection: collections) { + ret[i++] = collection.toString(); + } + return ret; + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index f83553ae2..2e91594d7 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -80,6 +80,7 @@ public synchronized void start(T taskContext) { if (running && snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED) { streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset()); eventDispatcher.setEventListener(streamingMetrics); + streamingMetrics.connected(true); streamingSource.execute(context); } } @@ -90,6 +91,9 @@ public synchronized void start(T taskContext) { catch (Exception e) { errorHandler.setProducerThrowable(e); } + finally { + streamingMetrics.connected(false); + } }); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 89541c055..0fed83734 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -21,6 +21,7 @@ import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.spi.ChangeEventCreator; import io.debezium.pipeline.spi.ChangeRecordEmitter; +import io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.SchemaChangeEventEmitter; import io.debezium.schema.DataCollectionFilters.DataCollectionFilter; @@ -81,7 +82,6 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector t public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException { // TODO Handle Heartbeat - eventListener.onEvent(); DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId); @@ -90,7 +90,15 @@ public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter change throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId); } - changeRecordEmitter.emitChangeRecords(dataCollectionSchema, receiver); + changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() { + + @Override + public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, + OffsetContext offset) throws InterruptedException { + eventListener.onEvent("source = " + dataCollectionId + ", id = " + key + ", offset = " + offset); + receiver.changeRecord(dataCollectionSchema, operation, key, value, offset); + } + }); } public SnapshotReceiver getSnapshotChangeEventReceiver() { @@ -105,9 +113,9 @@ public SnapshotReceiver getSnapshotChangeEventReceiver() { * {@link ChangeEventCreator} for converting them into data change events. */ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException { - eventListener.onEvent(); if(!filter.isIncluded(dataCollectionId)) { + eventListener.onSkippedEvent("source = " + dataCollectionId); LOGGER.trace("Skipping data change event for {}", dataCollectionId); } else { @@ -118,7 +126,15 @@ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter chan throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId); } - changeRecordEmitter.emitChangeRecords(dataCollectionSchema, streamingReceiver); + changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() { + + @Override + public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, + OffsetContext offset) throws InterruptedException { + eventListener.onEvent("operation = " + operation + ", source = " + dataCollectionId + ", id = " + key + ", offset = " + offset); + streamingReceiver.changeRecord(schema, operation, key, value, offset); + } + }); } heartbeat.heartbeat( diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java index f6de95d1d..38055660b 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java @@ -15,6 +15,7 @@ public interface ChangeEventSourceMetricsMXBean { String getLastEvent(); long getMilliSecondsSinceLastEvent(); long getTotalNumberOfEventsSeen(); + long getNumberOfEventsSkipped(); String[] getMonitoredTables(); void reset(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java index e5f6ab4cd..f0414cd7b 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java @@ -28,12 +28,14 @@ public abstract class Metrics implements DataChangeEventListener, ChangeEventSourceMetricsMXBean { protected final AtomicLong totalNumberOfEventsSeen = new AtomicLong(); + protected final AtomicLong numberOfEventsSkipped = new AtomicLong(); protected final AtomicLong lastEventTimestamp = new AtomicLong(-1); private final String contextName; protected final Clock clock; - private final CdcSourceTaskContext taskContext; + protected final CdcSourceTaskContext taskContext; private volatile ObjectName name; + private volatile String lastEvent; protected Metrics(T taskContext, String contextName) { this.contextName = contextName; @@ -76,15 +78,25 @@ public final void unregister(Logger logger) { } @Override - public void onEvent() { + public void onEvent(String event) { + lastEvent = event; + updateCommonEventMetrics(); + } + + private void updateCommonEventMetrics() { totalNumberOfEventsSeen.incrementAndGet(); lastEventTimestamp.set(clock.currentTimeInMillis()); } - // TODO DBZ-978 + @Override + public void onSkippedEvent(String event) { + numberOfEventsSkipped.incrementAndGet(); + updateCommonEventMetrics(); + } + @Override public String getLastEvent() { - return "not implemented"; + return lastEvent; } @Override @@ -97,6 +109,11 @@ public long getTotalNumberOfEventsSeen() { return totalNumberOfEventsSeen.get(); } + @Override + public long getNumberOfEventsSkipped() { + return numberOfEventsSkipped.get(); + } + @Override public void reset() { totalNumberOfEventsSeen.set(0); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java index cdd592ca8..71cab8f45 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java @@ -34,10 +34,8 @@ public class SnapshotChangeEventSourceMetrics extends Metrics implements Snapsho private final AtomicLong stopTime = new AtomicLong(); private final ConcurrentMap rowsScanned = new ConcurrentHashMap(); - // TODO DBZ-978 what's the purpose of the value here? It's never updated. private final ConcurrentMap remainingTables = new ConcurrentHashMap<>(); - // TODO DBZ-978 Pull up to Metrics private final Set monitoredTables = Collections.synchronizedSet(new HashSet<>()); public SnapshotChangeEventSourceMetrics(T taskContext) { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java index 0365f5419..030ae0886 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java @@ -17,7 +17,6 @@ @ThreadSafe public class StreamingChangeEventSourceMetrics extends Metrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener { - // DBZ-978 Toggle when losing the connection private final AtomicBoolean connected = new AtomicBoolean(); public StreamingChangeEventSourceMetrics(T taskContext) { @@ -29,9 +28,12 @@ public boolean isConnected() { return this.connected.get(); } - // TODO DBZ-978 @Override public String[] getMonitoredTables() { - return new String[] {}; + return taskContext.capturedDataCollections(); + } + + public void connected(boolean connected) { + this.connected.set(connected); } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java index c695ec6a8..23a20bd78 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java @@ -15,8 +15,16 @@ */ public interface DataChangeEventListener { - // TODO DBZ-978 pass representation of incoming event - void onEvent(); + void onEvent(String event); + void onSkippedEvent(String event); - static DataChangeEventListener NO_OP = () -> {}; + static DataChangeEventListener NO_OP = new DataChangeEventListener() { + @Override + public void onSkippedEvent(String event) { + } + + @Override + public void onEvent(String event) { + } + }; }