DBZ-4459: Introduce interfaces and default implementations for change event source metrics

This commit is contained in:
Sergei Morozov 2021-12-15 14:11:14 -08:00 committed by Gunnar Morling
parent 2beefb9d8f
commit 0329859cda
14 changed files with 409 additions and 345 deletions

View File

@ -12,14 +12,14 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mongodb.DisconnectEvent;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.DefaultSnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Chris Cranford
*/
@ThreadSafe
public class MongoDbSnapshotChangeEventSourceMetrics extends SnapshotChangeEventSourceMetrics implements MongoDbSnapshotChangeEventSourceMetricsMBean {
public class MongoDbSnapshotChangeEventSourceMetrics extends DefaultSnapshotChangeEventSourceMetrics implements MongoDbSnapshotChangeEventSourceMetricsMBean {
private AtomicLong numberOfDisconnects = new AtomicLong();

View File

@ -13,14 +13,14 @@
import io.debezium.connector.mongodb.DisconnectEvent;
import io.debezium.connector.mongodb.PrimaryElectionEvent;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.DefaultStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Chris Cranford
*/
@ThreadSafe
public class MongoDbStreamingChangeEventSourceMetrics extends StreamingChangeEventSourceMetrics implements MongoDbStreamingChangeEventSourceMetricsMBean {
public class MongoDbStreamingChangeEventSourceMetrics extends DefaultStreamingChangeEventSourceMetrics implements MongoDbStreamingChangeEventSourceMetricsMBean {
private AtomicLong numberOfPrimaryElections = new AtomicLong();
private AtomicLong numberOfDisconnects = new AtomicLong();

View File

@ -8,14 +8,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.DefaultSnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* @author Randall Hauch
*
*/
class MySqlSnapshotChangeEventSourceMetrics extends SnapshotChangeEventSourceMetrics implements MySqlSnapshotChangeEventSourceMetricsMXBean {
class MySqlSnapshotChangeEventSourceMetrics extends DefaultSnapshotChangeEventSourceMetrics implements MySqlSnapshotChangeEventSourceMetricsMXBean {
private final AtomicBoolean holdingGlobalLock = new AtomicBoolean();

View File

@ -14,14 +14,14 @@
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientStatistics;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.DefaultStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.util.Collect;
/**
* @author Randall Hauch
*/
public class MySqlStreamingChangeEventSourceMetrics extends StreamingChangeEventSourceMetrics implements MySqlStreamingChangeEventSourceMetricsMXBean {
public class MySqlStreamingChangeEventSourceMetrics extends DefaultStreamingChangeEventSourceMetrics implements MySqlStreamingChangeEventSourceMetricsMXBean {
private final BinaryLogClient client;
private final BinaryLogClientStatistics stats;

View File

@ -8,13 +8,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.DefaultSnapshotChangeEventSourceMetrics;
/**
* @author Randall Hauch
*
*/
class SnapshotReaderMetrics extends SnapshotChangeEventSourceMetrics implements SnapshotReaderMetricsMXBean {
class SnapshotReaderMetrics extends DefaultSnapshotChangeEventSourceMetrics implements SnapshotReaderMetricsMXBean {
private final AtomicBoolean holdingGlobalLock = new AtomicBoolean();

View File

@ -24,14 +24,14 @@
import io.debezium.annotation.VisibleForTesting;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.DefaultStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
/**
* The metrics implementation for Oracle connector streaming phase.
*/
@ThreadSafe
public class OracleStreamingChangeEventSourceMetrics extends StreamingChangeEventSourceMetrics implements OracleStreamingChangeEventSourceMetricsMXBean {
public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingChangeEventSourceMetrics implements OracleStreamingChangeEventSourceMetricsMXBean {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleStreamingChangeEventSourceMetrics.class);

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.metrics;
import org.slf4j.Logger;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
/**
* Common API for all change event source metrics regardless of the connector phase.
*/
interface ChangeEventSourceMetrics extends DataChangeEventListener {
void register(Logger logger);
void unregister(Logger logger);
}

View File

@ -18,13 +18,13 @@ public class DefaultChangeEventSourceMetricsFactory implements ChangeEventSource
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
return new DefaultSnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
@Override
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext,
ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider eventMetadataProvider) {
return new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
return new DefaultStreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider);
}
}

View File

@ -0,0 +1,222 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.metrics;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
/**
* The default implementation of metrics related to the snapshot phase of a connector.
*
* @author Randall Hauch, Jiri Pechanec
*/
@ThreadSafe
public class DefaultSnapshotChangeEventSourceMetrics extends PipelineMetrics
implements SnapshotChangeEventSourceMetrics, SnapshotChangeEventSourceMetricsMXBean {
private final AtomicBoolean snapshotRunning = new AtomicBoolean();
private final AtomicBoolean snapshotCompleted = new AtomicBoolean();
private final AtomicBoolean snapshotAborted = new AtomicBoolean();
private final AtomicLong startTime = new AtomicLong();
private final AtomicLong stopTime = new AtomicLong();
private final ConcurrentMap<String, Long> rowsScanned = new ConcurrentHashMap<String, Long>();
private final ConcurrentMap<String, String> remainingTables = new ConcurrentHashMap<>();
private final AtomicReference<String> chunkId = new AtomicReference<>();
private final AtomicReference<Object[]> chunkFrom = new AtomicReference<>();
private final AtomicReference<Object[]> chunkTo = new AtomicReference<>();
private final AtomicReference<Object[]> tableFrom = new AtomicReference<>();
private final AtomicReference<Object[]> tableTo = new AtomicReference<>();
private final Set<String> capturedTables = Collections.synchronizedSet(new HashSet<>());
public <T extends CdcSourceTaskContext> DefaultSnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
super(taskContext, "snapshot", changeEventQueueMetrics, metadataProvider);
}
@Override
public int getTotalTableCount() {
return this.capturedTables.size();
}
@Override
public int getRemainingTableCount() {
return this.remainingTables.size();
}
@Override
public boolean getSnapshotRunning() {
return this.snapshotRunning.get();
}
@Override
public boolean getSnapshotCompleted() {
return this.snapshotCompleted.get();
}
@Override
public boolean getSnapshotAborted() {
return this.snapshotAborted.get();
}
@Override
public long getSnapshotDurationInSeconds() {
final long startMillis = startTime.get();
if (startMillis <= 0L) {
return 0;
}
long stopMillis = stopTime.get();
if (stopMillis == 0L) {
stopMillis = clock.currentTimeInMillis();
}
return (stopMillis - startMillis) / 1000L;
}
/**
* @deprecated Superseded by the 'Captured Tables' metric. Use {@link #getCapturedTables()}.
* Scheduled for removal in a future release.
*/
@Override
@Deprecated
public String[] getMonitoredTables() {
return capturedTables.toArray(new String[capturedTables.size()]);
}
@Override
public String[] getCapturedTables() {
return capturedTables.toArray(new String[capturedTables.size()]);
}
@Override
public void monitoredDataCollectionsDetermined(Iterable<? extends DataCollectionId> dataCollectionIds) {
Iterator<? extends DataCollectionId> it = dataCollectionIds.iterator();
while (it.hasNext()) {
DataCollectionId dataCollectionId = it.next();
this.remainingTables.put(dataCollectionId.identifier(), "");
capturedTables.add(dataCollectionId.identifier());
}
}
@Override
public void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, long numRows) {
rowsScanned.put(dataCollectionId.identifier(), numRows);
remainingTables.remove(dataCollectionId.identifier());
}
@Override
public void snapshotStarted() {
this.snapshotRunning.set(true);
this.snapshotCompleted.set(false);
this.snapshotAborted.set(false);
this.startTime.set(clock.currentTimeInMillis());
this.stopTime.set(0L);
}
@Override
public void snapshotCompleted() {
this.snapshotCompleted.set(true);
this.snapshotAborted.set(false);
this.snapshotRunning.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
@Override
public void snapshotAborted() {
this.snapshotCompleted.set(false);
this.snapshotAborted.set(true);
this.snapshotRunning.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
@Override
public void rowsScanned(TableId tableId, long numRows) {
rowsScanned.put(tableId.toString(), numRows);
}
@Override
public ConcurrentMap<String, Long> getRowsScanned() {
return rowsScanned;
}
@Override
public void currentChunk(String chunkId, Object[] chunkFrom, Object[] chunkTo) {
this.chunkId.set(chunkId);
this.chunkFrom.set(chunkFrom);
this.chunkTo.set(chunkTo);
}
@Override
public void currentChunk(String chunkId, Object[] chunkFrom, Object[] chunkTo, Object tableTo[]) {
currentChunk(chunkId, chunkFrom, chunkTo);
this.tableFrom.set(chunkFrom);
this.tableTo.set(tableTo);
}
@Override
public String getChunkId() {
return chunkId.get();
}
@Override
public String getChunkFrom() {
return arrayToString(chunkFrom.get());
}
@Override
public String getChunkTo() {
return arrayToString(chunkTo.get());
}
@Override
public String getTableFrom() {
return arrayToString(tableFrom.get());
}
@Override
public String getTableTo() {
return arrayToString(tableTo.get());
}
private String arrayToString(Object[] array) {
return (array == null) ? null : Arrays.toString(array);
}
@Override
public void reset() {
super.reset();
snapshotRunning.set(false);
snapshotCompleted.set(false);
snapshotAborted.set(false);
startTime.set(0);
stopTime.set(0);
rowsScanned.clear();
remainingTables.clear();
capturedTables.clear();
chunkId.set(null);
chunkFrom.set(null);
chunkTo.set(null);
tableFrom.set(null);
tableTo.set(null);
}
}

View File

@ -0,0 +1,133 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.metrics;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
/**
* The default implementation of metrics related to the streaming phase of a connector.
*
* @author Randall Hauch, Jiri Pechanec
*/
@ThreadSafe
public class DefaultStreamingChangeEventSourceMetrics extends PipelineMetrics
implements StreamingChangeEventSourceMetrics, StreamingChangeEventSourceMetricsMXBean {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStreamingChangeEventSourceMetrics.class);
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicReference<Duration> lagBehindSource = new AtomicReference<>();
private final AtomicLong numberOfCommittedTransactions = new AtomicLong();
private final AtomicReference<Map<String, String>> sourceEventPosition = new AtomicReference<Map<String, String>>(Collections.emptyMap());
private final AtomicReference<String> lastTransactionId = new AtomicReference<>();
public <T extends CdcSourceTaskContext> DefaultStreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
super(taskContext, "streaming", changeEventQueueMetrics, metadataProvider);
}
@Override
public boolean isConnected() {
return this.connected.get();
}
/**
* @deprecated Superseded by the 'Captured Tables' metric. Use {@link #getCapturedTables()}.
* Scheduled for removal in a future release.
*/
@Override
@Deprecated
public String[] getMonitoredTables() {
return taskContext.capturedDataCollections();
}
@Override
public String[] getCapturedTables() {
return taskContext.capturedDataCollections();
}
public void connected(boolean connected) {
this.connected.set(connected);
LOGGER.info("Connected metrics set to '{}'", this.connected.get());
}
@Override
public Map<String, String> getSourceEventPosition() {
return sourceEventPosition.get();
}
@Override
public long getMilliSecondsBehindSource() {
Duration lag = lagBehindSource.get();
return lag != null ? lag.toMillis() : -1;
}
@Override
public long getNumberOfCommittedTransactions() {
return numberOfCommittedTransactions.get();
}
@Override
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Operation operation) {
super.onEvent(source, offset, key, value, operation);
final Instant eventTimestamp = metadataProvider.getEventTimestamp(source, offset, key, value);
if (eventTimestamp != null) {
lagBehindSource.set(Duration.between(eventTimestamp, Instant.now()));
}
final String transactionId = metadataProvider.getTransactionId(source, offset, key, value);
if (transactionId != null) {
if (!transactionId.equals(lastTransactionId.get())) {
lastTransactionId.set(transactionId);
numberOfCommittedTransactions.incrementAndGet();
}
}
final Map<String, String> eventSource = metadataProvider.getEventSourcePosition(source, offset, key, value);
if (eventSource != null) {
sourceEventPosition.set(eventSource);
}
}
@Override
public void onConnectorEvent(ConnectorEvent event) {
}
@Override
public String getLastTransactionId() {
return lastTransactionId.get();
}
@Override
public void reset() {
super.reset();
connected.set(false);
lagBehindSource.set(null);
numberOfCommittedTransactions.set(0);
sourceEventPosition.set(Collections.emptyMap());
lastTransactionId.set(null);
}
}

View File

@ -5,218 +5,10 @@
*/
package io.debezium.pipeline.metrics;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
/**
* Metrics related to the initial snapshot of a connector.
*
* @author Randall Hauch, Jiri Pechanec
* Metrics related to the snapshot phase of a connector.
*/
@ThreadSafe
public class SnapshotChangeEventSourceMetrics extends PipelineMetrics implements SnapshotChangeEventSourceMetricsMXBean, SnapshotProgressListener {
private final AtomicBoolean snapshotRunning = new AtomicBoolean();
private final AtomicBoolean snapshotCompleted = new AtomicBoolean();
private final AtomicBoolean snapshotAborted = new AtomicBoolean();
private final AtomicLong startTime = new AtomicLong();
private final AtomicLong stopTime = new AtomicLong();
private final ConcurrentMap<String, Long> rowsScanned = new ConcurrentHashMap<String, Long>();
private final ConcurrentMap<String, String> remainingTables = new ConcurrentHashMap<>();
private final AtomicReference<String> chunkId = new AtomicReference<>();
private final AtomicReference<Object[]> chunkFrom = new AtomicReference<>();
private final AtomicReference<Object[]> chunkTo = new AtomicReference<>();
private final AtomicReference<Object[]> tableFrom = new AtomicReference<>();
private final AtomicReference<Object[]> tableTo = new AtomicReference<>();
private final Set<String> capturedTables = Collections.synchronizedSet(new HashSet<>());
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
super(taskContext, "snapshot", changeEventQueueMetrics, metadataProvider);
}
@Override
public int getTotalTableCount() {
return this.capturedTables.size();
}
@Override
public int getRemainingTableCount() {
return this.remainingTables.size();
}
@Override
public boolean getSnapshotRunning() {
return this.snapshotRunning.get();
}
@Override
public boolean getSnapshotCompleted() {
return this.snapshotCompleted.get();
}
@Override
public boolean getSnapshotAborted() {
return this.snapshotAborted.get();
}
@Override
public long getSnapshotDurationInSeconds() {
final long startMillis = startTime.get();
if (startMillis <= 0L) {
return 0;
}
long stopMillis = stopTime.get();
if (stopMillis == 0L) {
stopMillis = clock.currentTimeInMillis();
}
return (stopMillis - startMillis) / 1000L;
}
/**
* @deprecated Superseded by the 'Captured Tables' metric. Use {@link #getCapturedTables()}.
* Scheduled for removal in a future release.
*/
@Override
@Deprecated
public String[] getMonitoredTables() {
return capturedTables.toArray(new String[capturedTables.size()]);
}
@Override
public String[] getCapturedTables() {
return capturedTables.toArray(new String[capturedTables.size()]);
}
@Override
public void monitoredDataCollectionsDetermined(Iterable<? extends DataCollectionId> dataCollectionIds) {
Iterator<? extends DataCollectionId> it = dataCollectionIds.iterator();
while (it.hasNext()) {
DataCollectionId dataCollectionId = it.next();
this.remainingTables.put(dataCollectionId.identifier(), "");
capturedTables.add(dataCollectionId.identifier());
}
}
@Override
public void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, long numRows) {
rowsScanned.put(dataCollectionId.identifier(), numRows);
remainingTables.remove(dataCollectionId.identifier());
}
@Override
public void snapshotStarted() {
this.snapshotRunning.set(true);
this.snapshotCompleted.set(false);
this.snapshotAborted.set(false);
this.startTime.set(clock.currentTimeInMillis());
this.stopTime.set(0L);
}
@Override
public void snapshotCompleted() {
this.snapshotCompleted.set(true);
this.snapshotAborted.set(false);
this.snapshotRunning.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
@Override
public void snapshotAborted() {
this.snapshotCompleted.set(false);
this.snapshotAborted.set(true);
this.snapshotRunning.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
@Override
public void rowsScanned(TableId tableId, long numRows) {
rowsScanned.put(tableId.toString(), numRows);
}
@Override
public ConcurrentMap<String, Long> getRowsScanned() {
return rowsScanned;
}
@Override
public void currentChunk(String chunkId, Object[] chunkFrom, Object[] chunkTo) {
this.chunkId.set(chunkId);
this.chunkFrom.set(chunkFrom);
this.chunkTo.set(chunkTo);
}
@Override
public void currentChunk(String chunkId, Object[] chunkFrom, Object[] chunkTo, Object tableTo[]) {
currentChunk(chunkId, chunkFrom, chunkTo);
this.tableFrom.set(chunkFrom);
this.tableTo.set(tableTo);
}
@Override
public String getChunkId() {
return chunkId.get();
}
@Override
public String getChunkFrom() {
return arrayToString(chunkFrom.get());
}
@Override
public String getChunkTo() {
return arrayToString(chunkTo.get());
}
@Override
public String getTableFrom() {
return arrayToString(tableFrom.get());
}
@Override
public String getTableTo() {
return arrayToString(tableTo.get());
}
private String arrayToString(Object[] array) {
return (array == null) ? null : Arrays.toString(array);
}
@Override
public void reset() {
super.reset();
snapshotRunning.set(false);
snapshotCompleted.set(false);
snapshotAborted.set(false);
startTime.set(0);
stopTime.set(0);
rowsScanned.clear();
remainingTables.clear();
capturedTables.clear();
chunkId.set(null);
chunkFrom.set(null);
chunkTo.set(null);
tableFrom.set(null);
tableTo.set(null);
}
public interface SnapshotChangeEventSourceMetrics extends ChangeEventSourceMetrics, SnapshotProgressListener {
}

View File

@ -5,127 +5,10 @@
*/
package io.debezium.pipeline.metrics;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.pipeline.source.spi.StreamingProgressListener;
/**
* @author Randall Hauch, Jiri Pechanec
* Metrics related to the streaming phase of a connector.
*/
@ThreadSafe
public class StreamingChangeEventSourceMetrics extends PipelineMetrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingChangeEventSourceMetrics.class);
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicReference<Duration> lagBehindSource = new AtomicReference<>();
private final AtomicLong numberOfCommittedTransactions = new AtomicLong();
private final AtomicReference<Map<String, String>> sourceEventPosition = new AtomicReference<Map<String, String>>(Collections.emptyMap());
private final AtomicReference<String> lastTransactionId = new AtomicReference<>();
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
super(taskContext, "streaming", changeEventQueueMetrics, metadataProvider);
}
@Override
public boolean isConnected() {
return this.connected.get();
}
/**
* @deprecated Superseded by the 'Captured Tables' metric. Use {@link #getCapturedTables()}.
* Scheduled for removal in a future release.
*/
@Override
@Deprecated
public String[] getMonitoredTables() {
return taskContext.capturedDataCollections();
}
@Override
public String[] getCapturedTables() {
return taskContext.capturedDataCollections();
}
public void connected(boolean connected) {
this.connected.set(connected);
LOGGER.info("Connected metrics set to '{}'", this.connected.get());
}
@Override
public Map<String, String> getSourceEventPosition() {
return sourceEventPosition.get();
}
@Override
public long getMilliSecondsBehindSource() {
Duration lag = lagBehindSource.get();
return lag != null ? lag.toMillis() : -1;
}
@Override
public long getNumberOfCommittedTransactions() {
return numberOfCommittedTransactions.get();
}
@Override
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Operation operation) {
super.onEvent(source, offset, key, value, operation);
final Instant eventTimestamp = metadataProvider.getEventTimestamp(source, offset, key, value);
if (eventTimestamp != null) {
lagBehindSource.set(Duration.between(eventTimestamp, Instant.now()));
}
final String transactionId = metadataProvider.getTransactionId(source, offset, key, value);
if (transactionId != null) {
if (!transactionId.equals(lastTransactionId.get())) {
lastTransactionId.set(transactionId);
numberOfCommittedTransactions.incrementAndGet();
}
}
final Map<String, String> eventSource = metadataProvider.getEventSourcePosition(source, offset, key, value);
if (eventSource != null) {
sourceEventPosition.set(eventSource);
}
}
@Override
public void onConnectorEvent(ConnectorEvent event) {
}
@Override
public String getLastTransactionId() {
return lastTransactionId.get();
}
@Override
public void reset() {
super.reset();
connected.set(false);
lagBehindSource.set(null);
numberOfCommittedTransactions.set(0);
sourceEventPosition.set(Collections.emptyMap());
lastTransactionId.set(null);
}
public interface StreamingChangeEventSourceMetrics extends ChangeEventSourceMetrics, StreamingProgressListener {
}

View File

@ -9,7 +9,7 @@
import io.debezium.schema.DataCollectionId;
/**
* A class invoked by {@link SnapshotChangeEventSource} whenever an important event or change of state happens.
* Invoked whenever an important event or change of state happens during the snapshot phase.
*
* @author Jiri Pechanec
*/

View File

@ -0,0 +1,14 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source.spi;
/**
* Invoked whenever an important event or change of state happens during the streaming phase.
*/
public interface StreamingProgressListener {
void connected(boolean connected);
}