diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Module.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Module.java index fd8acec1e..652b85317 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Module.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Module.java @@ -22,7 +22,17 @@ public static String version() { return INFO.getProperty("version"); } + /** + * @return symbolic name of the connector plugin + */ public static String name() { return "mongodb"; } + + /** + * @return context name used in log MDC and JMX metrics + */ + public static String contextName() { + return "MongoDB"; + } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 655a8c6ef..45245b14d 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -391,4 +391,9 @@ protected SourceInfoStructMaker getSourceInfoStruc return new MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this); } } + + @Override + public String getContextName() { + return Module.contextName(); + } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java index 9a4f378ca..20f72401d 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java @@ -28,7 +28,7 @@ public class MongoDbTaskContext extends CdcSourceTaskContext { * @param config the configuration */ public MongoDbTaskContext(Configuration config) { - super("MongoDB", config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet); + super(Module.contextName(), config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet); final String serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME); this.filters = new Filters(config); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java index e09ebe677..e003662f4 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReaderMetrics.java @@ -14,13 +14,13 @@ import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientStatistics; import io.debezium.connector.base.ChangeEventQueueMetrics; -import io.debezium.pipeline.metrics.Metrics; +import io.debezium.pipeline.metrics.PipelineMetrics; import io.debezium.util.Collect; /** * @author Randall Hauch */ -class BinlogReaderMetrics extends Metrics implements BinlogReaderMetricsMXBean { +class BinlogReaderMetrics extends PipelineMetrics implements BinlogReaderMetricsMXBean { private final BinaryLogClient client; private final BinaryLogClientStatistics stats; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Module.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Module.java index 2cedb6d05..eb6a35269 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Module.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Module.java @@ -22,7 +22,17 @@ public static String version() { return INFO.getProperty("version"); } + /** + * @return symbolic name of the connector plugin + */ public static String name() { return "mysql"; } + + /** + * @return context name used in log MDC and JMX metrics + */ + public static String contextName() { + return "MySQL"; + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 29e94d76f..e37b72289 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -1191,4 +1191,9 @@ protected SourceInfoStructMaker getSourceInfoStruc return new MySqlSourceInfoStructMaker(Module.name(), Module.version(), this); } } + + @Override + public String getContextName() { + return Module.contextName(); + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 0c16f3988..b562a37e8 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -63,7 +63,7 @@ public String version() { @Override public synchronized void start(Configuration config) { final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME); - PreviousContext prevLoggingContext = LoggingContext.forConnector("MySQL", serverName, "task"); + PreviousContext prevLoggingContext = LoggingContext.forConnector(Module.contextName(), serverName, "task"); try { // Get the offsets for our partition ... diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 2dc3d675e..dd4691e6e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -35,6 +35,7 @@ import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryMetrics; import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.schema.TopicSelector; import io.debezium.text.MultipleParsingExceptions; @@ -129,7 +130,7 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { return SourceInfo.isPositionAtOrBefore(recorded, desired, gtidFilter); } }; - this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates + this.dbHistory.configure(dbHistoryConfig, historyComparator, new DatabaseHistoryMetrics(configuration)); // validates } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index 944575f92..9bfb42918 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -57,7 +57,7 @@ public MySqlTaskContext(Configuration config, Filters filters, Map re } public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCaseInsensitive, Map restartOffset) { - super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList); + super(Module.contextName(), config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList); this.config = config; this.connectorConfig = new MySqlConnectorConfig(config); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Module.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Module.java index 5508f264a..b891284f2 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Module.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Module.java @@ -22,7 +22,17 @@ public static String version() { return INFO.getProperty("version"); } + /** + * @return symbolic name of the connector plugin + */ public static String name() { return "postgresql"; } + + /** + * @return context name used in log MDC and JMX metrics + */ + public static String contextName() { + return "Postgres"; + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 71dd4a695..1b9e6244c 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -908,4 +908,9 @@ private static int validateTableBlacklist(Configuration config, Field field, Fie } return 0; } + + @Override + public String getContextName() { + return Module.contextName(); + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index bb3dfd089..d51042c0f 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -40,7 +40,7 @@ public class PostgresTaskContext extends CdcSourceTaskContext { private Long lastXmin; protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector topicSelector) { - super("Postgres", config.getLogicalName(), Collections::emptySet); + super(config.getContextName(), config.getLogicalName(), Collections::emptySet); this.config = config; if (config.xminFetchInterval().toMillis() > 0) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java index cd358044b..0529addec 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java @@ -22,7 +22,17 @@ public static String version() { return INFO.getProperty("version"); } + /** + * @return symbolic name of the connector plugin + */ public static String name() { return "sqlserver"; } + + /** + * @return context name used in log MDC and JMX metrics + */ + public static String contextName() { + return "SQL_Server"; + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index a0edb663f..46567ddda 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -341,4 +341,9 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { } }; } + + @Override + public String getContextName() { + return Module.contextName(); + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java index 3ffba34cc..d7b348d76 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java @@ -16,6 +16,6 @@ public class SqlServerTaskContext extends CdcSourceTaskContext { public SqlServerTaskContext(SqlServerConnectorConfig config, SqlServerDatabaseSchema schema) { - super("SQL_Server", config.getLogicalName(), schema::tableIds); + super(config.getContextName(), config.getLogicalName(), schema::tableIds); } } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 10731d705..eca983f74 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -200,6 +200,8 @@ public String getLogicalName() { return logicalName; } + public abstract String getContextName(); + public String getHeartbeatTopicsPrefix() { return heartbeatTopicsPrefix; } diff --git a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java index 5c89737f3..8e03d133a 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/CdcSourceTaskContext.java @@ -9,9 +9,6 @@ import java.util.Collections; import java.util.function.Supplier; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - import org.apache.kafka.connect.source.SourceTask; import io.debezium.schema.DataCollectionId; @@ -60,20 +57,18 @@ public Clock getClock() { return clock; } - /** - * Create a JMX metric name for the given metric. - * @param contextName the name of the context - * @return the JMX metric name - * @throws MalformedObjectNameException if the name is invalid - */ - public ObjectName metricName(String contextName) throws MalformedObjectNameException { - return new ObjectName("debezium." + connectorType.toLowerCase() + ":type=connector-metrics,context=" + contextName + ",server=" + connectorName); - } - public String[] capturedDataCollections() { return collectionsSupplier.get() .stream() .map(DataCollectionId::toString) .toArray(String[]::new); } + + public String getConnectorType() { + return connectorType; + } + + public String getConnectorName() { + return connectorName; + } } diff --git a/debezium-core/src/main/java/io/debezium/metrics/Metrics.java b/debezium-core/src/main/java/io/debezium/metrics/Metrics.java new file mode 100644 index 000000000..143e754b0 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/metrics/Metrics.java @@ -0,0 +1,86 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.metrics; + +import java.lang.management.ManagementFactory; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.common.CdcSourceTaskContext; + +/** + * Base for metrics implementations. + * + * @author Jiri Pechanec + */ +@ThreadSafe +public abstract class Metrics { + + private final ObjectName name; + + protected Metrics(CdcSourceTaskContext taskContext, String contextName) { + this.name = metricName(taskContext.getConnectorType(), taskContext.getConnectorName(), contextName); + } + + protected Metrics(CommonConnectorConfig connectorConfig, String contextName) { + this.name = metricName(connectorConfig.getContextName(), connectorConfig.getLogicalName(), contextName); + } + + /** + * Registers a metrics MBean into the platform MBean server. + * The method is intentionally synchronized to prevent preemption between registration and unregistration. + */ + public synchronized void register(Logger logger) { + try { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + mBeanServer.registerMBean(this, name); + } + catch (JMException e) { + logger.warn("Error while register the MBean '{}': {}", name, e.getMessage()); + } + } + + /** + * Unregisters a metrics MBean from the platform MBean server. + * The method is intentionally synchronized to prevent preemption between registration and unregistration. + */ + public final void unregister(Logger logger) { + if (this.name != null) { + try { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + mBeanServer.unregisterMBean(name); + } + catch (JMException e) { + logger.error("Unable to unregister the MBean '{}'", name); + } + } + } + + /** + * Create a JMX metric name for the given metric. + * @param contextName the name of the context + * @return the JMX metric name + * @throws MalformedObjectNameException if the name is invalid + */ + public ObjectName metricName(String connectorType, String connectorName, String contextName) { + final String metricName = "debezium." + connectorType.toLowerCase() + ":type=connector-metrics,context=" + contextName + ",server=" + connectorName; + try { + return new ObjectName(metricName); + } + catch (MalformedObjectNameException e) { + throw new ConnectException("Invalid metric name '" + metricName + "'"); + } + } + +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java similarity index 61% rename from debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java rename to debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java index 5aef09c34..eb4fddba9 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/Metrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java @@ -5,19 +5,14 @@ */ package io.debezium.pipeline.metrics; -import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicLong; -import javax.management.JMException; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import org.apache.kafka.connect.data.Struct; -import org.slf4j.Logger; import io.debezium.annotation.ThreadSafe; import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.metrics.Metrics; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.OffsetContext; @@ -30,7 +25,7 @@ * @author Randall Hauch, Jiri Pechanec */ @ThreadSafe -public abstract class Metrics implements DataChangeEventListener, ChangeEventSourceMetricsMXBean { +public abstract class PipelineMetrics extends Metrics implements DataChangeEventListener, ChangeEventSourceMetricsMXBean { protected final EventMetadataProvider metadataProvider; protected final AtomicLong totalNumberOfEventsSeen = new AtomicLong(); @@ -38,54 +33,18 @@ public abstract class Metrics implements DataChangeEventListener, ChangeEventSou protected final AtomicLong lastEventTimestamp = new AtomicLong(-1); private volatile String lastEvent; - private final String contextName; protected final Clock clock; - protected final CdcSourceTaskContext taskContext; private final ChangeEventQueueMetrics changeEventQueueMetrics; - private volatile ObjectName name; + protected final CdcSourceTaskContext taskContext; - protected Metrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { - this.contextName = contextName; + protected PipelineMetrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { + super(taskContext, contextName); this.taskContext = taskContext; this.clock = taskContext.getClock(); this.changeEventQueueMetrics = changeEventQueueMetrics; this.metadataProvider = metadataProvider; } - /** - * Registers a metrics MBean into the platform MBean server. - * The method is intentionally synchronized to prevent preemption between registration and unregistration. - */ - public synchronized void register(Logger logger) { - try { - name = taskContext.metricName(this.contextName); - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - mBeanServer.registerMBean(this, name); - } - catch (JMException e) { - logger.warn("Error while register the MBean '{}': {}", name, e.getMessage()); - } - } - - /** - * Unregisters a metrics MBean from the platform MBean server. - * The method is intentionally synchronized to prevent preemption between registration and unregistration. - */ - public final void unregister(Logger logger) { - if (this.name != null) { - try { - final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - mBeanServer.unregisterMBean(name); - } - catch (JMException e) { - logger.error("Unable to unregister the MBean '{}'", name); - } - finally { - this.name = null; - } - } - } - @Override public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) { updateCommonEventMetrics(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java index 94958835d..9a8c90ef6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/SnapshotChangeEventSourceMetrics.java @@ -27,7 +27,7 @@ * @author Randall Hauch, Jiri Pechanec */ @ThreadSafe -public class SnapshotChangeEventSourceMetrics extends Metrics implements SnapshotChangeEventSourceMetricsMXBean, SnapshotProgressListener { +public class SnapshotChangeEventSourceMetrics extends PipelineMetrics implements SnapshotChangeEventSourceMetricsMXBean, SnapshotProgressListener { private final AtomicBoolean snapshotRunning = new AtomicBoolean(); private final AtomicBoolean snapshotCompleted = new AtomicBoolean(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java index 71dd95a8c..2bda390a7 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/StreamingChangeEventSourceMetrics.java @@ -27,7 +27,7 @@ * @author Randall Hauch, Jiri Pechanec */ @ThreadSafe -public class StreamingChangeEventSourceMetrics extends Metrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener { +public class StreamingChangeEventSourceMetrics extends PipelineMetrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener { private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicReference lagBehindSource = new AtomicReference<>(); diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index 5c8914ec9..39ea6ef46 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -15,6 +15,7 @@ import io.debezium.relational.Selectors.TableIdToStringMapper; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryMetrics; import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.relational.history.KafkaDatabaseHistory; @@ -69,7 +70,7 @@ public DatabaseHistory getDatabaseHistory() { .build(); HistoryRecordComparator historyComparator = getHistoryRecordComparator(); - databaseHistory.configure(dbHistoryConfig, historyComparator); // validates + databaseHistory.configure(dbHistoryConfig, historyComparator, new DatabaseHistoryMetrics(this)); // validates return databaseHistory; } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java index 67905fefa..14c779edb 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java @@ -35,23 +35,25 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory { private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; private boolean skipUnparseableDDL; private Function> ddlFilter = (x -> Optional.empty()); + private DatabaseHistoryListener listener = DatabaseHistoryListener.NOOP; protected AbstractDatabaseHistory() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator) { + public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener) { this.config = config; this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE; this.skipUnparseableDDL = config.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS); final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER); this.ddlFilter = (ddlFilter != null) ? Predicates.matchedBy(ddlFilter) : this.ddlFilter; + this.listener = listener; } @Override public void start() { - // do nothing + listener.started(); } @Override @@ -64,14 +66,18 @@ public final void record(Map source, Map position, String @Override public final void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes) throws DatabaseHistoryException { - storeRecord(new HistoryRecord(source, position, databaseName, schemaName, ddl, changes)); + final HistoryRecord record = new HistoryRecord(source, position, databaseName, schemaName, ddl, changes); + storeRecord(record); + listener.onChangeApplied(record); } @Override public final void recover(Map source, Map position, Tables schema, DdlParser ddlParser) { logger.debug("Recovering DDL history for source partition {} and offset {}", source, position); + listener.recoveryStarted(); HistoryRecord stopPoint = new HistoryRecord(source, position, null, null, null, null); recoverRecords(recovered -> { + listener.onChangeFromHistory(recovered); if (comparator.isAtOrBefore(recovered, stopPoint)) { Array tableChanges = recovered.tableChanges(); String ddl = recovered.ddl(); @@ -87,6 +93,7 @@ public final void recover(Map source, Map position, Tables schema.removeTable(entry.getId()); } } + listener.onChangeApplied(recovered); } else if (ddl != null && ddlParser != null) { if (recovered.databaseName() != null) { @@ -103,6 +110,7 @@ else if (ddl != null && ddlParser != null) { try { logger.debug("Applying: {}", ddl); ddlParser.parse(ddl, schema); + listener.onChangeApplied(recovered); } catch (final ParsingException e) { if (skipUnparseableDDL) { logger.warn("Ignoring unparseable statements '{}' stored in database history: {}", ddl, e); @@ -115,6 +123,7 @@ else if (ddl != null && ddlParser != null) { logger.debug("Skipping: {}", recovered.ddl()); } }); + listener.recoveryStopped(); } protected abstract void storeRecord(HistoryRecord record) throws DatabaseHistoryException; @@ -123,7 +132,7 @@ else if (ddl != null && ddlParser != null) { @Override public void stop() { - // do nothing + listener.stopped(); } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java index b7abd3978..316e7b309 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java @@ -78,8 +78,9 @@ public interface DatabaseHistory { * @param comparator the function that should be used to compare history records during * {@link #recover(Map, Map, Tables, DdlParser) recovery}; may be null if the * {@link HistoryRecordComparator#INSTANCE default comparator} is to be used + * @param listener TODO */ - void configure(Configuration config, HistoryRecordComparator comparator); + void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener); /** * Start the history. @@ -116,7 +117,7 @@ public interface DatabaseHistory { void recover(Map source, Map position, Tables schema, DdlParser ddlParser); /** - * Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator)}. + * Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator, DatabaseHistoryListener)}. */ void stop(); diff --git a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryListener.java b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryListener.java new file mode 100644 index 000000000..57d384c4c --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryListener.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +/** + * Listener receiving lifecycle and data events from {@link DatabaseHistory}. + * + * @author Jiri Pechanec + * + */ +public interface DatabaseHistoryListener { + public void started(); + public void stopped(); + public void recoveryStarted(); + public void recoveryStopped(); + + /** + * Invoked for every change read from the history during recovery. + * + * @param record + */ + public void onChangeFromHistory(HistoryRecord record); + + /** + * Invoked for every change applied and not filtered. + * + * @param record + */ + public void onChangeApplied(HistoryRecord record); + + static DatabaseHistoryListener NOOP = new DatabaseHistoryListener() { + @Override + public void stopped() { + } + + @Override + public void started() { + } + + @Override + public void recoveryStopped() { + } + + @Override + public void recoveryStarted() { + } + + @Override + public void onChangeFromHistory(HistoryRecord record) { + } + + @Override + public void onChangeApplied(HistoryRecord record) { + } + }; +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryMXBean.java b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryMXBean.java new file mode 100644 index 000000000..d3425c78e --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryMXBean.java @@ -0,0 +1,60 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +/** + * Metrics describing {@link DatabaseHistory} use. + * @author Jiri Pechanec + * + */ +public interface DatabaseHistoryMXBean { + + /** + * The database history starts in {@code STOPPED} state. + * Upon start it transitions to {@code RECOVERING} state. + * When all changes from stored history were applied then it switches to {@code RUNNING} state. + *

Maps to {@link DatabaseHistoryMetrics.DatabaseHistoryStatus} enum. + * + * @return database history component state + */ + String getStatus(); + + /** + * @return time in epoch seconds when recovery has started + */ + long getRecoveryStartTime(); + + /** + * @return number of changes that were read during recovery phase + */ + long getChangesRecovered(); + + /** + * @return number of changes that were applied during recovery phase increased by number of changes + * applied during runtime + */ + long getChangesApplied(); + + /** + * @return elapsed time in milliseconds since the last change was applied + */ + long getMilliSecondsSinceLastAppliedChange(); + + /** + * @return elapsed time in milliseconds since the last record was recovered from history + */ + long getMilliSecondsSinceLastRecoveredChange(); + + /** + * @return String representation of the last applied change + */ + String getLastAppliedChange(); + + /** + * @return String representation of the last recovered change + */ + String getLastRecoveredChange(); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryMetrics.java b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryMetrics.java new file mode 100644 index 000000000..c6b9a1a6a --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistoryMetrics.java @@ -0,0 +1,138 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.metrics.Metrics; +import io.debezium.schema.DatabaseSchema; + +/** + * Implementation of {@link DatabaseSchema} metrics. + * + * @author Jiri Pechanec + * + */ +public class DatabaseHistoryMetrics extends Metrics implements DatabaseHistoryListener, DatabaseHistoryMXBean { + + private static final String CONTEXT_NAME = "schema-history"; + + private static final Duration PAUSE_BETWEEN_LOG_MESSAGES = Duration.ofSeconds(2); + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHistoryMetrics.class); + + public static enum DatabaseHistoryStatus { + STOPPED, RECOVERING, RUNNING + } + + private DatabaseHistoryStatus status = DatabaseHistoryStatus.STOPPED; + private Instant recoveryStartTime = null; + private AtomicLong changesRecovered = new AtomicLong(); + private AtomicLong totalChangesApplied = new AtomicLong(); + private Instant lastChangeAppliedTimestamp; + private Instant lastChangeRecoveredTimestamp; + private HistoryRecord lastAppliedChange; + private HistoryRecord lastRecoveredChange; + + protected DatabaseHistoryMetrics(T taskContext, String contextName) { + super(taskContext, contextName); + } + + public DatabaseHistoryMetrics(CommonConnectorConfig connectorConfig) { + super(connectorConfig, CONTEXT_NAME); + } + + @Override + public String getStatus() { + return status.toString(); + } + + @Override + public long getRecoveryStartTime() { + return recoveryStartTime == null ? -1 : recoveryStartTime.getEpochSecond(); + } + + @Override + public long getChangesRecovered() { + return changesRecovered.get(); + } + + @Override + public long getChangesApplied() { + return totalChangesApplied.get(); + } + + @Override + public long getMilliSecondsSinceLastAppliedChange() { + return lastChangeAppliedTimestamp == null ? -1 : Duration.between(lastChangeAppliedTimestamp, Instant.now()).toMillis(); + } + + @Override + public long getMilliSecondsSinceLastRecoveredChange() { + return lastChangeRecoveredTimestamp == null ? -1 : Duration.between(lastChangeRecoveredTimestamp, Instant.now()).toMillis(); + } + + @Override + public String getLastAppliedChange() { + return lastAppliedChange == null ? "" : lastAppliedChange.toString(); + } + + @Override + public String getLastRecoveredChange() { + return lastRecoveredChange == null ? "" : lastRecoveredChange.toString(); + } + + @Override + public void started() { + status = DatabaseHistoryStatus.RUNNING; + register(LOGGER); + } + + @Override + public void stopped() { + status = DatabaseHistoryStatus.STOPPED; + unregister(LOGGER); + } + + @Override + public void recoveryStarted() { + status = DatabaseHistoryStatus.RECOVERING; + recoveryStartTime = Instant.now(); + } + + @Override + public void recoveryStopped() { + status = DatabaseHistoryStatus.RUNNING; + } + + @Override + public void onChangeFromHistory(HistoryRecord record) { + lastRecoveredChange = record; + changesRecovered.incrementAndGet(); + if (getMilliSecondsSinceLastRecoveredChange() >= PAUSE_BETWEEN_LOG_MESSAGES.toMillis()) { + LOGGER.info("Database history recovery in progress, recovered {} records", changesRecovered); + } + lastChangeRecoveredTimestamp = Instant.now(); + } + + @Override + public void onChangeApplied(HistoryRecord record) { + lastAppliedChange = record; + totalChangesApplied.incrementAndGet(); + if (getMilliSecondsSinceLastAppliedChange() >= PAUSE_BETWEEN_LOG_MESSAGES.toMillis()) { + LOGGER.info("Already applied {} database changes", totalChangesApplied); + } + lastChangeAppliedTimestamp = Instant.now(); + } + +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java index 3635e9332..4b4cada7e 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java @@ -50,7 +50,7 @@ public final class FileDatabaseHistory extends AbstractDatabaseHistory { private Path path; @Override - public void configure(Configuration config, HistoryRecordComparator comparator) { + public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener) { if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { throw new ConnectException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); @@ -59,12 +59,13 @@ public void configure(Configuration config, HistoryRecordComparator comparator) if (running.get()) { throw new IllegalStateException("Database history file already initialized to " + path); } - super.configure(config, comparator); + super.configure(config, comparator, listener); path = Paths.get(config.getString(FILE_PATH)); } @Override public void start() { + super.start(); lock.write(() -> { if (running.compareAndSet(false, true)) { Path path = this.path; @@ -126,6 +127,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException @Override public void stop() { running.set(false); + super.stop(); } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index 542d615ca..7f4756247 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -130,8 +130,8 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory { private Duration pollInterval; @Override - public void configure(Configuration config, HistoryRecordComparator comparator) { - super.configure(config, comparator); + public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener) { + super.configure(config, comparator, listener); if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); } diff --git a/debezium-core/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java b/debezium-core/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java index 4c4b581de..ac2a5786c 100644 --- a/debezium-core/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java +++ b/debezium-core/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java @@ -32,7 +32,7 @@ protected DatabaseHistory createHistory() { DatabaseHistory history = new FileDatabaseHistory(); history.configure(Configuration.create() .with(FileDatabaseHistory.FILE_PATH, TEST_FILE_PATH.toAbsolutePath().toString()) - .build(), null); + .build(), null, DatabaseHistoryMetrics.NOOP); history.start(); return history; } diff --git a/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java b/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java index ea47ace68..f10434a44 100644 --- a/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java +++ b/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java @@ -109,7 +109,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) { 50000) .with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL) .build(); - history.configure(config, null); + history.configure(config, null, DatabaseHistoryMetrics.NOOP); history.start(); // Should be able to call start more than once ... @@ -168,7 +168,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) { // Stop the history (which should stop the producer) ... history.stop(); history = new KafkaDatabaseHistory(); - history.configure(config, null); + history.configure(config, null, DatabaseHistoryListener.NOOP); // no need to start // Recover from the very beginning to just past the first change ... @@ -285,7 +285,7 @@ public void testExists() { .with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) .build(); - history.configure(config, null); + history.configure(config, null, DatabaseHistoryMetrics.NOOP); history.start(); // dummytopic should not exist yet