DBZ-1859 Enhance MongoDB connector metrics

This commit is contained in:
Chris Cranford 2020-04-07 13:42:45 -04:00 committed by Gunnar Morling
parent 6219376bfe
commit 57af80afd5
21 changed files with 362 additions and 7 deletions

View File

@ -0,0 +1,16 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import io.debezium.pipeline.MetadataEvent;
/**
* An event that implies that a connection was lost or with the source database.
*
* @author Chris Cranford
*/
public class DisconnectEvent implements MetadataEvent {
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Chris Cranford
*/
public class MongoDbChangeEventSourceMetricsFactory extends DefaultChangeEventSourceMetricsFactory {
@Override
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new MongoDbSnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
@Override
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new MongoDbStreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
}

View File

@ -110,6 +110,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
clock,
replicaSets,
taskContext),
new MongoDbChangeEventSourceMetricsFactory(),
dispatcher,
schema);

View File

@ -231,6 +231,7 @@ private MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) {
throw new ConnectException("Error while attempting to " + desc, error);
}
else {
dispatcher.dispatchMetadataEvent(new DisconnectEvent());
LOGGER.error("Error while attempting to {}: ", desc, error.getMessage(), error);
throw new ConnectException("Error while attempting to " + desc, error);
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.concurrent.atomic.AtomicLong;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.MetadataEvent;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Chris Cranford
*/
public class MongoDbSnapshotChangeEventSourceMetrics extends SnapshotChangeEventSourceMetrics implements MongoDbSnapshotChangeEventSourceMetricsMBean {
private AtomicLong numberOfDisconnects = new AtomicLong();
public <T extends CdcSourceTaskContext> MongoDbSnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
super(taskContext, changeEventQueueMetrics, metadataProvider);
}
@Override
public long getNumberOfDisconnects() {
return numberOfDisconnects.get();
}
@Override
public void onMetadataEvent(MetadataEvent event) {
if (event instanceof DisconnectEvent) {
numberOfDisconnects.incrementAndGet();
}
}
@Override
public void reset() {
super.reset();
numberOfDisconnects.set(0);
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetricsMXBean;
/**
* Extended metrics exposed by the MongoDB connector during snapshot.
*
* @author Chris Cranford
*/
public interface MongoDbSnapshotChangeEventSourceMetricsMBean extends SnapshotChangeEventSourceMetricsMXBean {
long getNumberOfDisconnects();
}

View File

@ -135,6 +135,7 @@ private MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) {
throw new ConnectException("Error while attempting to " + desc, error);
}
else {
dispatcher.dispatchMetadataEvent(new DisconnectEvent());
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
throw new ConnectException("Error while attempting to " + desc, error);
}
@ -189,7 +190,7 @@ private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaS
// In this situation if not document is available, we'll pause.
final Document event = cursor.tryNext();
if (event != null) {
if (!handleOplogEvent(primaryAddress, event, event, 0, oplogContext)) {
if (!handleOplogEvent(primaryAddress, event, event, 0, oplogContext, context)) {
// Something happened and we are supposed to stop reading
return;
}
@ -238,7 +239,8 @@ private Bson getSkippedOperationsFilter() {
return skippedOperationsFilter;
}
private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext) {
private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext,
ChangeEventSourceContext context) {
String ns = event.getString("ns");
Document object = event.get(OBJECT_FIELD, Document.class);
if (Objects.isNull(object)) {
@ -272,6 +274,8 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D
LOGGER.info("Found new primary event in oplog, current {} is new primary. " +
"Continue to process oplog event.", primaryAddress);
}
dispatcher.dispatchMetadataEvent(new PrimaryElectionEvent(serverAddress));
}
// Otherwise ignore
if (LOGGER.isDebugEnabled()) {
@ -290,7 +294,7 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D
LOGGER.debug("Skipping record as it is expected to be already processed: {}", change);
continue;
}
final boolean r = handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext);
final boolean r = handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext, context);
if (!r) {
return false;
}
@ -303,7 +307,7 @@ private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, D
final Long operationId = event.getLong(SourceInfo.OPERATION_ID);
dispatcher.dispatchTransactionStartedEvent(Long.toString(operationId), oplogContext.getOffset());
for (Document change : txChanges) {
final boolean r = handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext);
final boolean r = handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext, context);
if (!r) {
return false;
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.concurrent.atomic.AtomicLong;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.MetadataEvent;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Chris Cranford
*/
public class MongoDbStreamingChangeEventSourceMetrics extends StreamingChangeEventSourceMetrics implements MongoDbStreamingChangeEventSourceMetricsMBean {
private AtomicLong numberOfPrimaryElections = new AtomicLong();
private AtomicLong numberOfDisconnects = new AtomicLong();
<T extends CdcSourceTaskContext> MongoDbStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
super(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
@Override
public long getNumberOfPrimaryElections() {
return numberOfPrimaryElections.get();
}
@Override
public long getNumberOfDisconnects() {
return numberOfDisconnects.get();
}
@Override
public void onMetadataEvent(MetadataEvent event) {
if (event instanceof PrimaryElectionEvent) {
numberOfPrimaryElections.incrementAndGet();
}
else if (event instanceof DisconnectEvent) {
numberOfDisconnects.incrementAndGet();
}
}
@Override
public void reset() {
super.reset();
this.numberOfPrimaryElections.set(0);
this.numberOfDisconnects.set(0);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetricsMXBean;
/**
* Extended metrics exposed by the MongoDB connector during streaming.
*
* @author Chris Cranford
*/
public interface MongoDbStreamingChangeEventSourceMetricsMBean extends StreamingChangeEventSourceMetricsMXBean {
long getNumberOfDisconnects();
long getNumberOfPrimaryElections();
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import com.mongodb.ServerAddress;
import io.debezium.pipeline.MetadataEvent;
/**
* A metadata event that signals that a primary election event was detected.
*
* @author Chris Cranford
*/
public class PrimaryElectionEvent implements MetadataEvent {
private final ServerAddress primaryAddress;
public PrimaryElectionEvent(ServerAddress primaryAddress) {
this.primaryAddress = primaryAddress;
}
public ServerAddress getPrimaryAddress() {
return primaryAddress;
}
}

View File

@ -11,6 +11,7 @@
import java.lang.management.ManagementFactory;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@ -99,6 +100,7 @@ public void testSnapshotOnlyMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "MonitoredTables")).isEqualTo(new String[]{ "rs0.dbit.restaurants" });
assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull();
assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L);
}
@Test
@ -128,6 +130,10 @@ public void testStreamingOnlyMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getStreamingMetricsObjectName("mongodb", "mongo1");
for (MBeanAttributeInfo info : mBeanServer.getMBeanInfo(objectName).getAttributes()) {
System.out.println(info.getName());
}
assertThat(mBeanServer.getAttribute(objectName, "SourceEventPosition")).isNotNull();
assertThat(mBeanServer.getAttribute(objectName, "NumberOfCommittedTransactions")).isEqualTo(6L);
assertThat(mBeanServer.getAttribute(objectName, "LastTransactionId")).isNotNull();
@ -139,5 +145,7 @@ public void testStreamingOnlyMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L);
assertThat((Long) mBeanServer.getAttribute(objectName, "MilliSecondsSinceLastEvent")).isGreaterThanOrEqualTo(0);
assertThat((Long) mBeanServer.getAttribute(objectName, "MilliSecondsBehindSource")).isGreaterThanOrEqualTo(0);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfPrimaryElections")).isEqualTo(0L);
}
}

View File

@ -31,6 +31,7 @@
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
@ -162,6 +163,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
taskContext,
replicationConnection,
slotCreatedInfo),
new DefaultChangeEventSourceMetricsFactory(),
dispatcher,
schema);

View File

@ -22,6 +22,7 @@
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
@ -116,6 +117,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
SqlServerConnector.class,
connectorConfig,
new SqlServerChangeEventSourceFactory(connectorConfig, dataConnection, metadataConnection, errorHandler, dispatcher, clock, schema),
new DefaultChangeEventSourceMetricsFactory(),
dispatcher,
schema);

View File

@ -20,6 +20,7 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
@ -47,6 +48,7 @@ public class ChangeEventSourceCoordinator {
private final OffsetContext previousOffset;
private final ErrorHandler errorHandler;
private final ChangeEventSourceFactory changeEventSourceFactory;
private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory;
private final ExecutorService executor;
private final EventDispatcher<?> eventDispatcher;
private final DatabaseSchema<?> schema;
@ -59,10 +61,12 @@ public class ChangeEventSourceCoordinator {
public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig,
ChangeEventSourceFactory changeEventSourceFactory, EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema) {
ChangeEventSourceFactory changeEventSourceFactory,
ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema) {
this.previousOffset = previousOffset;
this.errorHandler = errorHandler;
this.changeEventSourceFactory = changeEventSourceFactory;
this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory;
this.executor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "change-event-source-coordinator");
this.eventDispatcher = eventDispatcher;
this.schema = schema;
@ -70,8 +74,8 @@ public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler e
public synchronized <T extends CdcSourceTaskContext> void start(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
this.snapshotMetrics = new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
this.streamingMetrics = new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
this.snapshotMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
this.streamingMetrics = changeEventSourceMetricsFactory.getStreamingMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
running = true;
// run the snapshot source on a separate thread so start() won't block

View File

@ -219,6 +219,10 @@ public void dispatchTransactionStartedEvent(String transactionId, OffsetContext
transactionMonitor.transactionStartedEvent(transactionId, offset);
}
public void dispatchMetadataEvent(MetadataEvent event) {
eventListener.onMetadataEvent(event);
}
public Optional<DataCollectionSchema> errorOnMissingSchema(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) {
eventListener.onErroneousEvent("source = " + dataCollectionId);
throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);

View File

@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline;
/**
* Contract that describes a metadata event isn't necessarily dispatched into the change event stream but
* is potentially of interest to other parts of the framework such as for capture by metrics.
*
* @author Chris Cranford
*/
public interface MetadataEvent {
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.metrics;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Chris Cranford
*/
public class DefaultChangeEventSourceMetricsFactory implements ChangeEventSourceMetricsFactory {
@Override
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
@Override
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
}

View File

@ -13,6 +13,7 @@
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.MetadataEvent;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
@ -70,6 +71,10 @@ public void onErroneousEvent(String event) {
updateCommonEventMetrics();
}
@Override
public void onMetadataEvent(MetadataEvent event) {
}
@Override
public String getLastEvent() {
return lastEvent;

View File

@ -20,6 +20,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.MetadataEvent;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
@ -98,6 +99,10 @@ public void onEvent(DataCollectionId source, OffsetContext offset, Object key, S
}
}
@Override
public void onMetadataEvent(MetadataEvent event) {
}
@Override
public String getLastTransactionId() {
return lastTransactionId.get();

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.metrics.spi;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* A factory for creating {@link SnapshotChangeEventSourceMetrics} and {@link StreamingChangeEventSourceMetrics}.
*
* @author Chris Cranford
*/
public interface ChangeEventSourceMetricsFactory {
/**
* Returns the snapshot change event source metrics.
*
* @param taskContext
* The task context
* @param changeEventQueueMetrics
* The change event queue metrics
* @param eventMetadataProvider
* The event metadata provider implementation
*
* @return a snapshot change event source metrics
*/
<T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider);
/**
* Returns the streaming change event source metrics.
*
* @param taskContext
* The task context
* @param changeEventQueueMetrics
* The change event queue metrics
* @param eventMetadataProvider
* The event metadata provider implementation
*
* @return a streaming change event source metrics
*/
<T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider);
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.MetadataEvent;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
@ -34,6 +35,12 @@ public interface DataChangeEventListener {
*/
void onErroneousEvent(String event);
/**
* Invoked for events that represent some metadata state change or event indicator.
* @param event
*/
void onMetadataEvent(MetadataEvent event);
static DataChangeEventListener NO_OP = new DataChangeEventListener() {
@Override
public void onFilteredEvent(String event) {
@ -43,6 +50,10 @@ public void onFilteredEvent(String event) {
public void onErroneousEvent(String event) {
}
@Override
public void onMetadataEvent(MetadataEvent event) {
}
@Override
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
}