DBZ-1040 Metrics for internal queue

This commit is contained in:
Jiri Pechanec 2019-01-29 15:31:32 +01:00 committed by Gunnar Morling
parent d993ee265e
commit 213d4a42ca
13 changed files with 74 additions and 16 deletions

View File

@ -22,6 +22,7 @@
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.time.Temporals;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
@ -49,6 +50,7 @@ public abstract class AbstractReader implements Reader {
private final Metronome metronome;
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final Duration pollInterval;
protected final ChangeEventQueueMetrics changeEventQueueMetrics;
private final HaltingPredicate acceptAndContinue;
@ -71,6 +73,18 @@ public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate ac
this.pollInterval = context.getConnectorConfig().getPollInterval();
this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
this.acceptAndContinue = acceptAndContinue == null? new AcceptAllPredicate() : acceptAndContinue;
this.changeEventQueueMetrics = new ChangeEventQueueMetrics() {
@Override
public int totalCapacity() {
return context.getConnectorConfig().getMaxQueueSize();
}
@Override
public int remainingCapacity() {
return records.remainingCapacity();
}
};
}
@Override

View File

@ -246,7 +246,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
client.setEventDeserializer(eventDeserializer);
// Set up for JMX ...
metrics = new BinlogReaderMetrics(client, context, name);
metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics);
heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(),
context.getConnectorConfig().getLogicalName());
}

View File

@ -11,6 +11,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientStatistics;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.pipeline.metrics.Metrics;
import io.debezium.util.Collect;
@ -28,8 +29,8 @@ class BinlogReaderMetrics extends Metrics implements BinlogReaderMetricsMXBean {
private final AtomicLong numberOfNotWellFormedTransactions = new AtomicLong();
private final AtomicLong numberOfLargeTransactions = new AtomicLong();
public BinlogReaderMetrics(BinaryLogClient client, MySqlTaskContext taskContext, String name) {
super(taskContext, name);
public BinlogReaderMetrics(BinaryLogClient client, MySqlTaskContext taskContext, String name, ChangeEventQueueMetrics changeEventQueueMetrics) {
super(taskContext, name, changeEventQueueMetrics);
this.client = client;
this.stats = new BinaryLogClientStatistics(client);
this.schema = taskContext.dbSchema();

View File

@ -78,7 +78,7 @@ public SnapshotReader(String name, MySqlTaskContext context) {
this.includeData = context.snapshotMode().includeData();
this.snapshotLockingMode = context.getConnectorConfig().getSnapshotLockingMode();
recorder = this::recordRowAsRead;
metrics = new SnapshotReaderMetrics(context, context.dbSchema());
metrics = new SnapshotReaderMetrics(context, context.dbSchema(), changeEventQueueMetrics);
}
/**

View File

@ -7,6 +7,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
/**
@ -19,8 +20,8 @@ class SnapshotReaderMetrics extends SnapshotChangeEventSourceMetrics implements
private final MySqlSchema schema;
public SnapshotReaderMetrics(MySqlTaskContext taskContext, MySqlSchema schema) {
super(taskContext);
public SnapshotReaderMetrics(MySqlTaskContext taskContext, MySqlSchema schema, ChangeEventQueueMetrics changeEventQueueMetrics) {
super(taskContext, changeEventQueueMetrics);
this.schema = schema;
}

View File

@ -124,7 +124,7 @@ public void start(Configuration config) {
dispatcher
);
coordinator.start(taskContext);
coordinator.start(taskContext, this.queue);
}
/**

View File

@ -51,12 +51,13 @@
* producers to the consumer, a custom type wrapping source records
* may be used.
*/
public class ChangeEventQueue<T> {
public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);
private final Duration pollInterval;
private final int maxBatchSize;
private final int maxQueueSize;
private final BlockingQueue<T> queue;
private final Metronome metronome;
private final Supplier<PreviousContext> loggingContextSupplier;
@ -66,6 +67,7 @@ public class ChangeEventQueue<T> {
private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.pollInterval = pollInterval;
this.maxBatchSize = maxBatchSize;
this.maxQueueSize = maxQueueSize;
this.queue = new LinkedBlockingDeque<>(maxQueueSize);
this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
this.loggingContextSupplier = loggingContextSupplier;
@ -168,4 +170,14 @@ private void throwProducerFailureIfPresent() {
throw new ConnectException("An exception ocurred in the change event producer. This connector will be stopped.", producerFailure);
}
}
@Override
public int totalCapacity() {
return maxQueueSize;
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
}

View File

@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.base;
public interface ChangeEventQueueMetrics {
int totalCapacity();
int remainingCapacity();
}

View File

@ -15,6 +15,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
@ -60,9 +61,9 @@ public ChangeEventSourceCoordinator(OffsetContext previousOffset, ErrorHandler e
this.eventDispatcher = eventDispatcher;
}
public synchronized <T extends CdcSourceTaskContext> void start(T taskContext) {
this.snapshotMetrics = new SnapshotChangeEventSourceMetrics(taskContext);
this.streamingMetrics = new StreamingChangeEventSourceMetrics(taskContext);
public synchronized <T extends CdcSourceTaskContext> void start(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics) {
this.snapshotMetrics = new SnapshotChangeEventSourceMetrics(taskContext, changeEventQueueMetrics);
this.streamingMetrics = new StreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics);
running = true;
// run the snapshot source on a separate thread so start() won't block

View File

@ -17,5 +17,7 @@ public interface ChangeEventSourceMetricsMXBean {
long getTotalNumberOfEventsSeen();
long getNumberOfEventsSkipped();
String[] getMonitoredTables();
int getQueueTotalCapacity();
int getQueueRemainingCapacity();
void reset();
}

View File

@ -16,6 +16,7 @@
import org.slf4j.Logger;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
@ -38,12 +39,14 @@ public abstract class Metrics implements DataChangeEventListener, ChangeEventSou
private final String contextName;
protected final Clock clock;
protected final CdcSourceTaskContext taskContext;
private final ChangeEventQueueMetrics changeEventQueueMetrics;
private volatile ObjectName name;
protected <T extends CdcSourceTaskContext> Metrics(T taskContext, String contextName) {
protected <T extends CdcSourceTaskContext> Metrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics) {
this.contextName = contextName;
this.taskContext = taskContext;
this.clock = taskContext.getClock();
this.changeEventQueueMetrics = changeEventQueueMetrics;
}
/**
@ -124,4 +127,14 @@ public void reset() {
numberOfEventsSkipped.set(0);
lastEvent = null;
}
@Override
public int getQueueTotalCapacity() {
return changeEventQueueMetrics.totalCapacity();
}
@Override
public int getQueueRemainingCapacity() {
return changeEventQueueMetrics.remainingCapacity();
}
}

View File

@ -15,6 +15,7 @@
import java.util.concurrent.atomic.AtomicLong;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.TableId;
@ -38,8 +39,8 @@ public class SnapshotChangeEventSourceMetrics extends Metrics implements Snapsho
private final Set<String> monitoredTables = Collections.synchronizedSet(new HashSet<>());
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics(T taskContext) {
super(taskContext, "snapshot");
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics) {
super(taskContext, "snapshot", changeEventQueueMetrics);
}
@Override

View File

@ -14,6 +14,7 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
@ -31,8 +32,8 @@ public class StreamingChangeEventSourceMetrics extends Metrics implements Stream
private final AtomicReference<Map<String, String>> sourceEventPosition = new AtomicReference<Map<String, String>>(Collections.emptyMap());
private volatile String lastTransactionId;
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics(T taskContext) {
super(taskContext, "streaming");
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics(T taskContext, ChangeEventQueueMetrics changeEventQueueMetrics) {
super(taskContext, "streaming", changeEventQueueMetrics);
}
@Override