From 57af80afd57a3ecbc299f3be174e85b175e9af1b Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Tue, 7 Apr 2020 13:42:45 -0400 Subject: [PATCH] DBZ-1859 Enhance MongoDB connector metrics --- .../connector/mongodb/DisconnectEvent.java | 16 ++++++ ...ongoDbChangeEventSourceMetricsFactory.java | 32 +++++++++++ .../mongodb/MongoDbConnectorTask.java | 1 + .../MongoDbSnapshotChangeEventSource.java | 1 + ...ngoDbSnapshotChangeEventSourceMetrics.java | 45 +++++++++++++++ ...SnapshotChangeEventSourceMetricsMBean.java | 17 ++++++ .../MongoDbStreamingChangeEventSource.java | 12 ++-- ...goDbStreamingChangeEventSourceMetrics.java | 55 +++++++++++++++++++ ...treamingChangeEventSourceMetricsMBean.java | 20 +++++++ .../mongodb/PrimaryElectionEvent.java | 28 ++++++++++ .../connector/mongodb/MongoMetricsIT.java | 8 +++ .../postgresql/PostgresConnectorTask.java | 2 + .../sqlserver/SqlServerConnectorTask.java | 2 + .../ChangeEventSourceCoordinator.java | 10 +++- .../io/debezium/pipeline/EventDispatcher.java | 4 ++ .../io/debezium/pipeline/MetadataEvent.java | 15 +++++ ...efaultChangeEventSourceMetricsFactory.java | 30 ++++++++++ .../pipeline/metrics/PipelineMetrics.java | 5 ++ .../StreamingChangeEventSourceMetrics.java | 5 ++ .../spi/ChangeEventSourceMetricsFactory.java | 50 +++++++++++++++++ .../source/spi/DataChangeEventListener.java | 11 ++++ 21 files changed, 362 insertions(+), 7 deletions(-) create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/DisconnectEvent.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceMetricsFactory.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetrics.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetricsMBean.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetrics.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetricsMBean.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/PrimaryElectionEvent.java create mode 100644 debezium-core/src/main/java/io/debezium/pipeline/MetadataEvent.java create mode 100644 debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultChangeEventSourceMetricsFactory.java create mode 100644 debezium-core/src/main/java/io/debezium/pipeline/metrics/spi/ChangeEventSourceMetricsFactory.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/DisconnectEvent.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/DisconnectEvent.java new file mode 100644 index 000000000..80bb5c94c --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/DisconnectEvent.java @@ -0,0 +1,16 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import io.debezium.pipeline.MetadataEvent; + +/** + * An event that implies that a connection was lost or with the source database. + * + * @author Chris Cranford + */ +public class DisconnectEvent implements MetadataEvent { +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceMetricsFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceMetricsFactory.java new file mode 100644 index 000000000..7ccaac001 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceMetricsFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; +import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; +import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.EventMetadataProvider; + +/** + * @author Chris Cranford + */ +public class MongoDbChangeEventSourceMetricsFactory extends DefaultChangeEventSourceMetricsFactory { + @Override + public SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + return new MongoDbSnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider); + } + + @Override + public StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + return new MongoDbStreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider); + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java index 0da991f74..0da2abe38 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java @@ -110,6 +110,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { clock, replicaSets, taskContext), + new MongoDbChangeEventSourceMetricsFactory(), dispatcher, schema); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 8ad90c4e4..917166372 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -231,6 +231,7 @@ private MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) { throw new ConnectException("Error while attempting to " + desc, error); } else { + dispatcher.dispatchMetadataEvent(new DisconnectEvent()); LOGGER.error("Error while attempting to {}: ", desc, error.getMessage(), error); throw new ConnectException("Error while attempting to " + desc, error); } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetrics.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetrics.java new file mode 100644 index 000000000..58493372c --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetrics.java @@ -0,0 +1,45 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import java.util.concurrent.atomic.AtomicLong; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.pipeline.MetadataEvent; +import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.EventMetadataProvider; + +/** + * @author Chris Cranford + */ +public class MongoDbSnapshotChangeEventSourceMetrics extends SnapshotChangeEventSourceMetrics implements MongoDbSnapshotChangeEventSourceMetricsMBean { + + private AtomicLong numberOfDisconnects = new AtomicLong(); + + public MongoDbSnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider metadataProvider) { + super(taskContext, changeEventQueueMetrics, metadataProvider); + } + + @Override + public long getNumberOfDisconnects() { + return numberOfDisconnects.get(); + } + + @Override + public void onMetadataEvent(MetadataEvent event) { + if (event instanceof DisconnectEvent) { + numberOfDisconnects.incrementAndGet(); + } + } + + @Override + public void reset() { + super.reset(); + numberOfDisconnects.set(0); + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetricsMBean.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetricsMBean.java new file mode 100644 index 000000000..43cd33f50 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSourceMetricsMBean.java @@ -0,0 +1,17 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetricsMXBean; + +/** + * Extended metrics exposed by the MongoDB connector during snapshot. + * + * @author Chris Cranford + */ +public interface MongoDbSnapshotChangeEventSourceMetricsMBean extends SnapshotChangeEventSourceMetricsMXBean { + long getNumberOfDisconnects(); +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index 1dd5b13ea..d9587f1a4 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -135,6 +135,7 @@ private MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) { throw new ConnectException("Error while attempting to " + desc, error); } else { + dispatcher.dispatchMetadataEvent(new DisconnectEvent()); LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error); throw new ConnectException("Error while attempting to " + desc, error); } @@ -189,7 +190,7 @@ private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaS // In this situation if not document is available, we'll pause. final Document event = cursor.tryNext(); if (event != null) { - if (!handleOplogEvent(primaryAddress, event, event, 0, oplogContext)) { + if (!handleOplogEvent(primaryAddress, event, event, 0, oplogContext, context)) { // Something happened and we are supposed to stop reading return; } @@ -238,7 +239,8 @@ private Bson getSkippedOperationsFilter() { return skippedOperationsFilter; } - private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext) { + private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext, + ChangeEventSourceContext context) { String ns = event.getString("ns"); Document object = event.get(OBJECT_FIELD, Document.class); if (Objects.isNull(object)) { @@ -272,6 +274,8 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D LOGGER.info("Found new primary event in oplog, current {} is new primary. " + "Continue to process oplog event.", primaryAddress); } + + dispatcher.dispatchMetadataEvent(new PrimaryElectionEvent(serverAddress)); } // Otherwise ignore if (LOGGER.isDebugEnabled()) { @@ -290,7 +294,7 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D LOGGER.debug("Skipping record as it is expected to be already processed: {}", change); continue; } - final boolean r = handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext); + final boolean r = handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext, context); if (!r) { return false; } @@ -303,7 +307,7 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D final Long operationId = event.getLong(SourceInfo.OPERATION_ID); dispatcher.dispatchTransactionStartedEvent(Long.toString(operationId), oplogContext.getOffset()); for (Document change : txChanges) { - final boolean r = handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext); + final boolean r = handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext, context); if (!r) { return false; } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetrics.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetrics.java new file mode 100644 index 000000000..210532607 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetrics.java @@ -0,0 +1,55 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import java.util.concurrent.atomic.AtomicLong; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.pipeline.MetadataEvent; +import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.EventMetadataProvider; + +/** + * @author Chris Cranford + */ +public class MongoDbStreamingChangeEventSourceMetrics extends StreamingChangeEventSourceMetrics implements MongoDbStreamingChangeEventSourceMetricsMBean { + + private AtomicLong numberOfPrimaryElections = new AtomicLong(); + private AtomicLong numberOfDisconnects = new AtomicLong(); + + MongoDbStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + super(taskContext, changeEventQueueMetrics, eventMetadataProvider); + } + + @Override + public long getNumberOfPrimaryElections() { + return numberOfPrimaryElections.get(); + } + + @Override + public long getNumberOfDisconnects() { + return numberOfDisconnects.get(); + } + + @Override + public void onMetadataEvent(MetadataEvent event) { + if (event instanceof PrimaryElectionEvent) { + numberOfPrimaryElections.incrementAndGet(); + } + else if (event instanceof DisconnectEvent) { + numberOfDisconnects.incrementAndGet(); + } + } + + @Override + public void reset() { + super.reset(); + this.numberOfPrimaryElections.set(0); + this.numberOfDisconnects.set(0); + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetricsMBean.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetricsMBean.java new file mode 100644 index 000000000..7776156c7 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSourceMetricsMBean.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetricsMXBean; + +/** + * Extended metrics exposed by the MongoDB connector during streaming. + * + * @author Chris Cranford + */ +public interface MongoDbStreamingChangeEventSourceMetricsMBean extends StreamingChangeEventSourceMetricsMXBean { + + long getNumberOfDisconnects(); + + long getNumberOfPrimaryElections(); +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/PrimaryElectionEvent.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/PrimaryElectionEvent.java new file mode 100644 index 000000000..b1d6063d8 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/PrimaryElectionEvent.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import com.mongodb.ServerAddress; + +import io.debezium.pipeline.MetadataEvent; + +/** + * A metadata event that signals that a primary election event was detected. + * + * @author Chris Cranford + */ +public class PrimaryElectionEvent implements MetadataEvent { + + private final ServerAddress primaryAddress; + + public PrimaryElectionEvent(ServerAddress primaryAddress) { + this.primaryAddress = primaryAddress; + } + + public ServerAddress getPrimaryAddress() { + return primaryAddress; + } +} diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java index bd8af1c8d..91051929d 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java @@ -11,6 +11,7 @@ import java.lang.management.ManagementFactory; import javax.management.InstanceNotFoundException; +import javax.management.MBeanAttributeInfo; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -99,6 +100,7 @@ public void testSnapshotOnlyMetrics() throws Exception { assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L); assertThat(mBeanServer.getAttribute(objectName, "MonitoredTables")).isEqualTo(new String[]{ "rs0.dbit.restaurants" }); assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull(); + assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L); } @Test @@ -128,6 +130,10 @@ public void testStreamingOnlyMetrics() throws Exception { final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); final ObjectName objectName = getStreamingMetricsObjectName("mongodb", "mongo1"); + for (MBeanAttributeInfo info : mBeanServer.getMBeanInfo(objectName).getAttributes()) { + System.out.println(info.getName()); + } + assertThat(mBeanServer.getAttribute(objectName, "SourceEventPosition")).isNotNull(); assertThat(mBeanServer.getAttribute(objectName, "NumberOfCommittedTransactions")).isEqualTo(6L); assertThat(mBeanServer.getAttribute(objectName, "LastTransactionId")).isNotNull(); @@ -139,5 +145,7 @@ public void testStreamingOnlyMetrics() throws Exception { assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L); assertThat((Long) mBeanServer.getAttribute(objectName, "MilliSecondsSinceLastEvent")).isGreaterThanOrEqualTo(0); assertThat((Long) mBeanServer.getAttribute(objectName, "MilliSecondsBehindSource")).isGreaterThanOrEqualTo(0); + assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L); + assertThat(mBeanServer.getAttribute(objectName, "NumberOfPrimaryElections")).isEqualTo(0L); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index e30a2dcac..22a7d6f81 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -31,6 +31,7 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.Clock; @@ -162,6 +163,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { taskContext, replicationConnection, slotCreatedInfo), + new DefaultChangeEventSourceMetricsFactory(), dispatcher, schema); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 534f1e466..5994c9e1a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -22,6 +22,7 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; @@ -116,6 +117,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { SqlServerConnector.class, connectorConfig, new SqlServerChangeEventSourceFactory(connectorConfig, dataConnection, metadataConnection, errorHandler, dispatcher, clock, schema), + new DefaultChangeEventSourceMetricsFactory(), dispatcher, schema); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index 49936fe7f..0955c78f2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -20,6 +20,7 @@ import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; +import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; @@ -47,6 +48,7 @@ public class ChangeEventSourceCoordinator { private final OffsetContext previousOffset; private final ErrorHandler errorHandler; private final ChangeEventSourceFactory changeEventSourceFactory; + private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory; private final ExecutorService executor; private final EventDispatcher eventDispatcher; private final DatabaseSchema schema; @@ -59,10 +61,12 @@ public class ChangeEventSourceCoordinator { public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class connectorType, CommonConnectorConfig connectorConfig, - ChangeEventSourceFactory changeEventSourceFactory, EventDispatcher eventDispatcher, DatabaseSchema schema) { + ChangeEventSourceFactory changeEventSourceFactory, + ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher eventDispatcher, DatabaseSchema schema) { this.previousOffset = previousOffset; this.errorHandler = errorHandler; this.changeEventSourceFactory = changeEventSourceFactory; + this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory; this.executor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "change-event-source-coordinator"); this.eventDispatcher = eventDispatcher; this.schema = schema; @@ -70,8 +74,8 @@ public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler e public synchronized void start(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { - this.snapshotMetrics = new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, metadataProvider); - this.streamingMetrics = new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, metadataProvider); + this.snapshotMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics(taskContext, changeEventQueueMetrics, metadataProvider); + this.streamingMetrics = changeEventSourceMetricsFactory.getStreamingMetrics(taskContext, changeEventQueueMetrics, metadataProvider); running = true; // run the snapshot source on a separate thread so start() won't block diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index f4ac366c2..3fb993c62 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -219,6 +219,10 @@ public void dispatchTransactionStartedEvent(String transactionId, OffsetContext transactionMonitor.transactionStartedEvent(transactionId, offset); } + public void dispatchMetadataEvent(MetadataEvent event) { + eventListener.onMetadataEvent(event); + } + public Optional errorOnMissingSchema(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) { eventListener.onErroneousEvent("source = " + dataCollectionId); throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/MetadataEvent.java b/debezium-core/src/main/java/io/debezium/pipeline/MetadataEvent.java new file mode 100644 index 000000000..b05ab2ab5 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/MetadataEvent.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline; + +/** + * Contract that describes a metadata event isn't necessarily dispatched into the change event stream but + * is potentially of interest to other parts of the framework such as for capture by metrics. + * + * @author Chris Cranford + */ +public interface MetadataEvent { +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultChangeEventSourceMetricsFactory.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultChangeEventSourceMetricsFactory.java new file mode 100644 index 000000000..75765e206 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultChangeEventSourceMetricsFactory.java @@ -0,0 +1,30 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.metrics; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory; +import io.debezium.pipeline.source.spi.EventMetadataProvider; + +/** + * @author Chris Cranford + */ +public class DefaultChangeEventSourceMetricsFactory implements ChangeEventSourceMetricsFactory { + @Override + public SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + return new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider); + } + + @Override + public StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + return new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider); + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java index a7d57a647..44e65feb2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java @@ -13,6 +13,7 @@ import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.metrics.Metrics; +import io.debezium.pipeline.MetadataEvent; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.OffsetContext; @@ -70,6 +71,10 @@ public void onErroneousEvent(String event) { updateCommonEventMetrics(); } + @Override + public void onMetadataEvent(MetadataEvent event) { + } + @Override public String getLastEvent() { return lastEvent; diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java index 5c7cebb09..eeba4945e 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java @@ -20,6 +20,7 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.pipeline.MetadataEvent; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.OffsetContext; @@ -98,6 +99,10 @@ public void onEvent(DataCollectionId source, OffsetContext offset, Object key, S } } + @Override + public void onMetadataEvent(MetadataEvent event) { + } + @Override public String getLastTransactionId() { return lastTransactionId.get(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/spi/ChangeEventSourceMetricsFactory.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/spi/ChangeEventSourceMetricsFactory.java new file mode 100644 index 000000000..1fe1099e5 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/spi/ChangeEventSourceMetricsFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.metrics.spi; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; +import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.EventMetadataProvider; + +/** + * A factory for creating {@link SnapshotChangeEventSourceMetrics} and {@link StreamingChangeEventSourceMetrics}. + * + * @author Chris Cranford + */ +public interface ChangeEventSourceMetricsFactory { + + /** + * Returns the snapshot change event source metrics. + * + * @param taskContext + * The task context + * @param changeEventQueueMetrics + * The change event queue metrics + * @param eventMetadataProvider + * The event metadata provider implementation + * + * @return a snapshot change event source metrics + */ + SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider); + + /** + * Returns the streaming change event source metrics. + * + * @param taskContext + * The task context + * @param changeEventQueueMetrics + * The change event queue metrics + * @param eventMetadataProvider + * The event metadata provider implementation + * + * @return a streaming change event source metrics + */ + StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider); +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java index e0e31f15d..b49442a25 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/DataChangeEventListener.java @@ -8,6 +8,7 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.MetadataEvent; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.schema.DataCollectionId; @@ -34,6 +35,12 @@ public interface DataChangeEventListener { */ void onErroneousEvent(String event); + /** + * Invoked for events that represent some metadata state change or event indicator. + * @param event + */ + void onMetadataEvent(MetadataEvent event); + static DataChangeEventListener NO_OP = new DataChangeEventListener() { @Override public void onFilteredEvent(String event) { @@ -43,6 +50,10 @@ public void onFilteredEvent(String event) { public void onErroneousEvent(String event) { } + @Override + public void onMetadataEvent(MetadataEvent event) { + } + @Override public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) { }