DBZ-978 Added last event and captured tables metrics

This commit is contained in:
Jiri Pechanec 2018-12-05 09:02:54 +01:00 committed by Gunnar Morling
parent 91b571059e
commit edec1c3090
11 changed files with 95 additions and 20 deletions

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.mongodb;
import java.util.Collections;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.common.CdcSourceTaskContext;
@ -28,7 +30,7 @@ public class MongoDbTaskContext extends CdcSourceTaskContext {
* @param config the configuration
*/
public MongoDbTaskContext(Configuration config) {
super("MongoDB", config.getString(MongoDbConnectorConfig.LOGICAL_NAME));
super("MongoDB", config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet);
final String serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME);
this.filters = new Filters(config);

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import java.util.Collections;
import java.util.Map;
import java.util.function.Predicate;
@ -53,7 +54,8 @@ public MySqlTaskContext(Configuration config) {
}
public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME));
// MySQL now calculates JMX binlog reader metrics on its own
super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList);
this.config = config;
this.connectorConfig = new MySqlConnectorConfig(config);

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql;
import java.sql.SQLException;
import java.util.Collections;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.common.CdcSourceTaskContext;
@ -29,7 +30,7 @@ public class PostgresTaskContext extends CdcSourceTaskContext {
private final PostgresSchema schema;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector<TableId> topicSelector) {
super("Postgres", config.getLogicalName());
super("Postgres", config.getLogicalName(), Collections::emptySet);
this.config = config;
this.topicSelector = topicSelector;

View File

@ -5,11 +5,15 @@
*/
package io.debezium.connector.common;
import java.util.Collection;
import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
@ -20,13 +24,17 @@
*/
public class CdcSourceTaskContext {
private static final String[] EMPTY_CAPTURED_LIST = new String[0];
private final String connectorType;
private final String connectorName;
private final Clock clock;
private final Supplier<Collection<? extends DataCollectionId>> collectionsSupplier;
public CdcSourceTaskContext(String connectorType, String connectorName) {
public CdcSourceTaskContext(String connectorType, String connectorName, Supplier<Collection<? extends DataCollectionId>> collectionsSupplier) {
this.connectorType = connectorType;
this.connectorName = connectorName;
this.collectionsSupplier = collectionsSupplier;
this.clock = Clock.system();
}
@ -58,4 +66,20 @@ public Clock getClock() {
public ObjectName metricName(String contextName) throws MalformedObjectNameException {
return new ObjectName("debezium." + connectorType.toLowerCase() + ":type=connector-metrics,context=" + contextName + ",server=" + connectorName);
}
public String[] capturedDataCollections() {
if (collectionsSupplier == null) {
return EMPTY_CAPTURED_LIST;
}
final Collection<? extends DataCollectionId> collections = collectionsSupplier.get();
if (collections == null) {
return EMPTY_CAPTURED_LIST;
}
String[] ret = new String[collections.size()];
int i = 0;
for (DataCollectionId collection: collections) {
ret[i++] = collection.toString();
}
return ret;
}
}

View File

@ -80,6 +80,7 @@ public synchronized <T extends CdcSourceTaskContext> void start(T taskContext) {
if (running && snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED) {
streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset());
eventDispatcher.setEventListener(streamingMetrics);
streamingMetrics.connected(true);
streamingSource.execute(context);
}
}
@ -90,6 +91,9 @@ public synchronized <T extends CdcSourceTaskContext> void start(T taskContext) {
catch (Exception e) {
errorHandler.setProducerThrowable(e);
}
finally {
streamingMetrics.connected(false);
}
});
}

View File

@ -21,6 +21,7 @@
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.schema.DataCollectionFilters.DataCollectionFilter;
@ -81,7 +82,6 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
// TODO Handle Heartbeat
eventListener.onEvent();
DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
@ -90,7 +90,15 @@ public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter change
throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
}
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, receiver);
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
@Override
public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value,
OffsetContext offset) throws InterruptedException {
eventListener.onEvent("source = " + dataCollectionId + ", id = " + key + ", offset = " + offset);
receiver.changeRecord(dataCollectionSchema, operation, key, value, offset);
}
});
}
public SnapshotReceiver getSnapshotChangeEventReceiver() {
@ -105,9 +113,9 @@ public SnapshotReceiver getSnapshotChangeEventReceiver() {
* {@link ChangeEventCreator} for converting them into data change events.
*/
public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException {
eventListener.onEvent();
if(!filter.isIncluded(dataCollectionId)) {
eventListener.onSkippedEvent("source = " + dataCollectionId);
LOGGER.trace("Skipping data change event for {}", dataCollectionId);
}
else {
@ -118,7 +126,15 @@ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter chan
throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
}
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, streamingReceiver);
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
@Override
public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value,
OffsetContext offset) throws InterruptedException {
eventListener.onEvent("operation = " + operation + ", source = " + dataCollectionId + ", id = " + key + ", offset = " + offset);
streamingReceiver.changeRecord(schema, operation, key, value, offset);
}
});
}
heartbeat.heartbeat(

View File

@ -15,6 +15,7 @@ public interface ChangeEventSourceMetricsMXBean {
String getLastEvent();
long getMilliSecondsSinceLastEvent();
long getTotalNumberOfEventsSeen();
long getNumberOfEventsSkipped();
String[] getMonitoredTables();
void reset();
}

View File

@ -28,12 +28,14 @@
public abstract class Metrics implements DataChangeEventListener, ChangeEventSourceMetricsMXBean {
protected final AtomicLong totalNumberOfEventsSeen = new AtomicLong();
protected final AtomicLong numberOfEventsSkipped = new AtomicLong();
protected final AtomicLong lastEventTimestamp = new AtomicLong(-1);
private final String contextName;
protected final Clock clock;
private final CdcSourceTaskContext taskContext;
protected final CdcSourceTaskContext taskContext;
private volatile ObjectName name;
private volatile String lastEvent;
protected <T extends CdcSourceTaskContext> Metrics(T taskContext, String contextName) {
this.contextName = contextName;
@ -76,15 +78,25 @@ public final void unregister(Logger logger) {
}
@Override
public void onEvent() {
public void onEvent(String event) {
lastEvent = event;
updateCommonEventMetrics();
}
private void updateCommonEventMetrics() {
totalNumberOfEventsSeen.incrementAndGet();
lastEventTimestamp.set(clock.currentTimeInMillis());
}
// TODO DBZ-978
@Override
public void onSkippedEvent(String event) {
numberOfEventsSkipped.incrementAndGet();
updateCommonEventMetrics();
}
@Override
public String getLastEvent() {
return "not implemented";
return lastEvent;
}
@Override
@ -97,6 +109,11 @@ public long getTotalNumberOfEventsSeen() {
return totalNumberOfEventsSeen.get();
}
@Override
public long getNumberOfEventsSkipped() {
return numberOfEventsSkipped.get();
}
@Override
public void reset() {
totalNumberOfEventsSeen.set(0);

View File

@ -34,10 +34,8 @@ public class SnapshotChangeEventSourceMetrics extends Metrics implements Snapsho
private final AtomicLong stopTime = new AtomicLong();
private final ConcurrentMap<String, Long> rowsScanned = new ConcurrentHashMap<String, Long>();
// TODO DBZ-978 what's the purpose of the value here? It's never updated.
private final ConcurrentMap<String, String> remainingTables = new ConcurrentHashMap<>();
// TODO DBZ-978 Pull up to Metrics
private final Set<String> monitoredTables = Collections.synchronizedSet(new HashSet<>());
public <T extends CdcSourceTaskContext> SnapshotChangeEventSourceMetrics(T taskContext) {

View File

@ -17,7 +17,6 @@
@ThreadSafe
public class StreamingChangeEventSourceMetrics extends Metrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener {
// DBZ-978 Toggle when losing the connection
private final AtomicBoolean connected = new AtomicBoolean();
public <T extends CdcSourceTaskContext> StreamingChangeEventSourceMetrics(T taskContext) {
@ -29,9 +28,12 @@ public boolean isConnected() {
return this.connected.get();
}
// TODO DBZ-978
@Override
public String[] getMonitoredTables() {
return new String[] {};
return taskContext.capturedDataCollections();
}
public void connected(boolean connected) {
this.connected.set(connected);
}
}

View File

@ -15,8 +15,16 @@
*/
public interface DataChangeEventListener {
// TODO DBZ-978 pass representation of incoming event
void onEvent();
void onEvent(String event);
void onSkippedEvent(String event);
static DataChangeEventListener NO_OP = () -> {};
static DataChangeEventListener NO_OP = new DataChangeEventListener() {
@Override
public void onSkippedEvent(String event) {
}
@Override
public void onEvent(String event) {
}
};
}