DBZ-1356 Metrics for schema history (#947)

* DBZ-1356 Metrics for schema history
This commit is contained in:
Jiri Pechanec 2019-07-02 13:26:17 +02:00 committed by Gunnar Morling
parent b1c1c05d1f
commit 13d638fd43
31 changed files with 457 additions and 84 deletions

View File

@ -22,7 +22,17 @@ public static String version() {
return INFO.getProperty("version");
}
/**
* @return symbolic name of the connector plugin
*/
public static String name() {
return "mongodb";
}
/**
* @return context name used in log MDC and JMX metrics
*/
public static String contextName() {
return "MongoDB";
}
}

View File

@ -391,4 +391,9 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
return new MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this);
}
}
@Override
public String getContextName() {
return Module.contextName();
}
}

View File

@ -28,7 +28,7 @@ public class MongoDbTaskContext extends CdcSourceTaskContext {
* @param config the configuration
*/
public MongoDbTaskContext(Configuration config) {
super("MongoDB", config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet);
super(Module.contextName(), config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet);
final String serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME);
this.filters = new Filters(config);

View File

@ -14,13 +14,13 @@
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientStatistics;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.pipeline.metrics.Metrics;
import io.debezium.pipeline.metrics.PipelineMetrics;
import io.debezium.util.Collect;
/**
* @author Randall Hauch
*/
class BinlogReaderMetrics extends Metrics implements BinlogReaderMetricsMXBean {
class BinlogReaderMetrics extends PipelineMetrics implements BinlogReaderMetricsMXBean {
private final BinaryLogClient client;
private final BinaryLogClientStatistics stats;

View File

@ -22,7 +22,17 @@ public static String version() {
return INFO.getProperty("version");
}
/**
* @return symbolic name of the connector plugin
*/
public static String name() {
return "mysql";
}
/**
* @return context name used in log MDC and JMX metrics
*/
public static String contextName() {
return "MySQL";
}
}

View File

@ -1191,4 +1191,9 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
return new MySqlSourceInfoStructMaker(Module.name(), Module.version(), this);
}
}
@Override
public String getContextName() {
return Module.contextName();
}
}

View File

@ -63,7 +63,7 @@ public String version() {
@Override
public synchronized void start(Configuration config) {
final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
PreviousContext prevLoggingContext = LoggingContext.forConnector("MySQL", serverName, "task");
PreviousContext prevLoggingContext = LoggingContext.forConnector(Module.contextName(), serverName, "task");
try {
// Get the offsets for our partition ...

View File

@ -35,6 +35,7 @@
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.TopicSelector;
import io.debezium.text.MultipleParsingExceptions;
@ -129,7 +130,7 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return SourceInfo.isPositionAtOrBefore(recorded, desired, gtidFilter);
}
};
this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates
this.dbHistory.configure(dbHistoryConfig, historyComparator, new DatabaseHistoryMetrics(configuration)); // validates
}

View File

@ -57,7 +57,7 @@ public MySqlTaskContext(Configuration config, Filters filters, Map<String, ?> re
}
public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCaseInsensitive, Map<String, ?> restartOffset) {
super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList);
super(Module.contextName(), config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList);
this.config = config;
this.connectorConfig = new MySqlConnectorConfig(config);

View File

@ -22,7 +22,17 @@ public static String version() {
return INFO.getProperty("version");
}
/**
* @return symbolic name of the connector plugin
*/
public static String name() {
return "postgresql";
}
/**
* @return context name used in log MDC and JMX metrics
*/
public static String contextName() {
return "Postgres";
}
}

View File

@ -908,4 +908,9 @@ private static int validateTableBlacklist(Configuration config, Field field, Fie
}
return 0;
}
@Override
public String getContextName() {
return Module.contextName();
}
}

View File

@ -40,7 +40,7 @@ public class PostgresTaskContext extends CdcSourceTaskContext {
private Long lastXmin;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector<TableId> topicSelector) {
super("Postgres", config.getLogicalName(), Collections::emptySet);
super(config.getContextName(), config.getLogicalName(), Collections::emptySet);
this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {

View File

@ -22,7 +22,17 @@ public static String version() {
return INFO.getProperty("version");
}
/**
* @return symbolic name of the connector plugin
*/
public static String name() {
return "sqlserver";
}
/**
* @return context name used in log MDC and JMX metrics
*/
public static String contextName() {
return "SQL_Server";
}
}

View File

@ -341,4 +341,9 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
}
};
}
@Override
public String getContextName() {
return Module.contextName();
}
}

View File

@ -16,6 +16,6 @@
public class SqlServerTaskContext extends CdcSourceTaskContext {
public SqlServerTaskContext(SqlServerConnectorConfig config, SqlServerDatabaseSchema schema) {
super("SQL_Server", config.getLogicalName(), schema::tableIds);
super(config.getContextName(), config.getLogicalName(), schema::tableIds);
}
}

View File

@ -200,6 +200,8 @@ public String getLogicalName() {
return logicalName;
}
public abstract String getContextName();
public String getHeartbeatTopicsPrefix() {
return heartbeatTopicsPrefix;
}

View File

@ -9,9 +9,6 @@
import java.util.Collections;
import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.schema.DataCollectionId;
@ -60,20 +57,18 @@ public Clock getClock() {
return clock;
}
/**
* Create a JMX metric name for the given metric.
* @param contextName the name of the context
* @return the JMX metric name
* @throws MalformedObjectNameException if the name is invalid
*/
public ObjectName metricName(String contextName) throws MalformedObjectNameException {
return new ObjectName("debezium." + connectorType.toLowerCase() + ":type=connector-metrics,context=" + contextName + ",server=" + connectorName);
}
public String[] capturedDataCollections() {
return collectionsSupplier.get()
.stream()
.map(DataCollectionId::toString)
.toArray(String[]::new);
}
public String getConnectorType() {
return connectorType;
}
public String getConnectorName() {
return connectorName;
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.metrics;
import java.lang.management.ManagementFactory;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.common.CdcSourceTaskContext;
/**
* Base for metrics implementations.
*
* @author Jiri Pechanec
*/
@ThreadSafe
public abstract class Metrics {
private final ObjectName name;
protected Metrics(CdcSourceTaskContext taskContext, String contextName) {
this.name = metricName(taskContext.getConnectorType(), taskContext.getConnectorName(), contextName);
}
protected Metrics(CommonConnectorConfig connectorConfig, String contextName) {
this.name = metricName(connectorConfig.getContextName(), connectorConfig.getLogicalName(), contextName);
}
/**
* Registers a metrics MBean into the platform MBean server.
* The method is intentionally synchronized to prevent preemption between registration and unregistration.
*/
public synchronized void register(Logger logger) {
try {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.registerMBean(this, name);
}
catch (JMException e) {
logger.warn("Error while register the MBean '{}': {}", name, e.getMessage());
}
}
/**
* Unregisters a metrics MBean from the platform MBean server.
* The method is intentionally synchronized to prevent preemption between registration and unregistration.
*/
public final void unregister(Logger logger) {
if (this.name != null) {
try {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.unregisterMBean(name);
}
catch (JMException e) {
logger.error("Unable to unregister the MBean '{}'", name);
}
}
}
/**
* Create a JMX metric name for the given metric.
* @param contextName the name of the context
* @return the JMX metric name
* @throws MalformedObjectNameException if the name is invalid
*/
public ObjectName metricName(String connectorType, String connectorName, String contextName) {
final String metricName = "debezium." + connectorType.toLowerCase() + ":type=connector-metrics,context=" + contextName + ",server=" + connectorName;
try {
return new ObjectName(metricName);
}
catch (MalformedObjectNameException e) {
throw new ConnectException("Invalid metric name '" + metricName + "'");
}
}
}

View File

@ -5,19 +5,14 @@
*/
package io.debezium.pipeline.metrics;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
@ -30,7 +25,7 @@
* @author Randall Hauch, Jiri Pechanec
*/
@ThreadSafe
public abstract class Metrics implements DataChangeEventListener, ChangeEventSourceMetricsMXBean {
public abstract class PipelineMetrics extends Metrics implements DataChangeEventListener, ChangeEventSourceMetricsMXBean {
protected final EventMetadataProvider metadataProvider;
protected final AtomicLong totalNumberOfEventsSeen = new AtomicLong();
@ -38,54 +33,18 @@ public abstract class Metrics implements DataChangeEventListener, ChangeEventSou
protected final AtomicLong lastEventTimestamp = new AtomicLong(-1);
private volatile String lastEvent;
private final String contextName;
protected final Clock clock;
protected final CdcSourceTaskContext taskContext;
private final ChangeEventQueueMetrics changeEventQueueMetrics;
private volatile ObjectName name;
protected final CdcSourceTaskContext taskContext;
protected <T extends CdcSourceTaskContext> Metrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) {
this.contextName = contextName;
protected <T extends CdcSourceTaskContext> PipelineMetrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) {
super(taskContext, contextName);
this.taskContext = taskContext;
this.clock = taskContext.getClock();
this.changeEventQueueMetrics = changeEventQueueMetrics;
this.metadataProvider = metadataProvider;
}
/**
* Registers a metrics MBean into the platform MBean server.
* The method is intentionally synchronized to prevent preemption between registration and unregistration.
*/
public synchronized <T extends CdcSourceTaskContext> void register(Logger logger) {
try {
name = taskContext.metricName(this.contextName);
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.registerMBean(this, name);
}
catch (JMException e) {
logger.warn("Error while register the MBean '{}': {}", name, e.getMessage());
}
}
/**
* Unregisters a metrics MBean from the platform MBean server.
* The method is intentionally synchronized to prevent preemption between registration and unregistration.
*/
public final void unregister(Logger logger) {
if (this.name != null) {
try {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.unregisterMBean(name);
}
catch (JMException e) {
logger.error("Unable to unregister the MBean '{}'", name);
}
finally {
this.name = null;
}
}
}
@Override
public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
updateCommonEventMetrics();

View File

@ -27,7 +27,7 @@
* @author Randall Hauch, Jiri Pechanec
*/
@ThreadSafe
public class SnapshotChangeEventSourceMetrics extends Metrics implements SnapshotChangeEventSourceMetricsMXBean, SnapshotProgressListener {
public class SnapshotChangeEventSourceMetrics extends PipelineMetrics implements SnapshotChangeEventSourceMetricsMXBean, SnapshotProgressListener {
private final AtomicBoolean snapshotRunning = new AtomicBoolean();
private final AtomicBoolean snapshotCompleted = new AtomicBoolean();

View File

@ -27,7 +27,7 @@
* @author Randall Hauch, Jiri Pechanec
*/
@ThreadSafe
public class StreamingChangeEventSourceMetrics extends Metrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener {
public class StreamingChangeEventSourceMetrics extends PipelineMetrics implements StreamingChangeEventSourceMetricsMXBean, DataChangeEventListener {
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicReference<Duration> lagBehindSource = new AtomicReference<>();

View File

@ -15,6 +15,7 @@
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -69,7 +70,7 @@ public DatabaseHistory getDatabaseHistory() {
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator); // validates
databaseHistory.configure(dbHistoryConfig, historyComparator, new DatabaseHistoryMetrics(this)); // validates
return databaseHistory;
}

View File

@ -35,23 +35,25 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;
private Function<String, Optional<Pattern>> ddlFilter = (x -> Optional.empty());
private DatabaseHistoryListener listener = DatabaseHistoryListener.NOOP;
protected AbstractDatabaseHistory() {
}
@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener) {
this.config = config;
this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE;
this.skipUnparseableDDL = config.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.matchedBy(ddlFilter) : this.ddlFilter;
this.listener = listener;
}
@Override
public void start() {
// do nothing
listener.started();
}
@Override
@ -64,14 +66,18 @@ public final void record(Map<String, ?> source, Map<String, ?> position, String
@Override
public final void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String schemaName, String ddl, TableChanges changes)
throws DatabaseHistoryException {
storeRecord(new HistoryRecord(source, position, databaseName, schemaName, ddl, changes));
final HistoryRecord record = new HistoryRecord(source, position, databaseName, schemaName, ddl, changes);
storeRecord(record);
listener.onChangeApplied(record);
}
@Override
public final void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
logger.debug("Recovering DDL history for source partition {} and offset {}", source, position);
listener.recoveryStarted();
HistoryRecord stopPoint = new HistoryRecord(source, position, null, null, null, null);
recoverRecords(recovered -> {
listener.onChangeFromHistory(recovered);
if (comparator.isAtOrBefore(recovered, stopPoint)) {
Array tableChanges = recovered.tableChanges();
String ddl = recovered.ddl();
@ -87,6 +93,7 @@ public final void recover(Map<String, ?> source, Map<String, ?> position, Tables
schema.removeTable(entry.getId());
}
}
listener.onChangeApplied(recovered);
}
else if (ddl != null && ddlParser != null) {
if (recovered.databaseName() != null) {
@ -103,6 +110,7 @@ else if (ddl != null && ddlParser != null) {
try {
logger.debug("Applying: {}", ddl);
ddlParser.parse(ddl, schema);
listener.onChangeApplied(recovered);
} catch (final ParsingException e) {
if (skipUnparseableDDL) {
logger.warn("Ignoring unparseable statements '{}' stored in database history: {}", ddl, e);
@ -115,6 +123,7 @@ else if (ddl != null && ddlParser != null) {
logger.debug("Skipping: {}", recovered.ddl());
}
});
listener.recoveryStopped();
}
protected abstract void storeRecord(HistoryRecord record) throws DatabaseHistoryException;
@ -123,7 +132,7 @@ else if (ddl != null && ddlParser != null) {
@Override
public void stop() {
// do nothing
listener.stopped();
}
@Override

View File

@ -78,8 +78,9 @@ public interface DatabaseHistory {
* @param comparator the function that should be used to compare history records during
* {@link #recover(Map, Map, Tables, DdlParser) recovery}; may be null if the
* {@link HistoryRecordComparator#INSTANCE default comparator} is to be used
* @param listener TODO
*/
void configure(Configuration config, HistoryRecordComparator comparator);
void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener);
/**
* Start the history.
@ -116,7 +117,7 @@ public interface DatabaseHistory {
void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser);
/**
* Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator)}.
* Stop recording history and release any resources acquired since {@link #configure(Configuration, HistoryRecordComparator, DatabaseHistoryListener)}.
*/
void stop();

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
/**
* Listener receiving lifecycle and data events from {@link DatabaseHistory}.
*
* @author Jiri Pechanec
*
*/
public interface DatabaseHistoryListener {
public void started();
public void stopped();
public void recoveryStarted();
public void recoveryStopped();
/**
* Invoked for every change read from the history during recovery.
*
* @param record
*/
public void onChangeFromHistory(HistoryRecord record);
/**
* Invoked for every change applied and not filtered.
*
* @param record
*/
public void onChangeApplied(HistoryRecord record);
static DatabaseHistoryListener NOOP = new DatabaseHistoryListener() {
@Override
public void stopped() {
}
@Override
public void started() {
}
@Override
public void recoveryStopped() {
}
@Override
public void recoveryStarted() {
}
@Override
public void onChangeFromHistory(HistoryRecord record) {
}
@Override
public void onChangeApplied(HistoryRecord record) {
}
};
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
/**
* Metrics describing {@link DatabaseHistory} use.
* @author Jiri Pechanec
*
*/
public interface DatabaseHistoryMXBean {
/**
* The database history starts in {@code STOPPED} state.
* Upon start it transitions to {@code RECOVERING} state.
* When all changes from stored history were applied then it switches to {@code RUNNING} state.
* <p>Maps to {@link DatabaseHistoryMetrics.DatabaseHistoryStatus} enum.
*
* @return database history component state
*/
String getStatus();
/**
* @return time in epoch seconds when recovery has started
*/
long getRecoveryStartTime();
/**
* @return number of changes that were read during recovery phase
*/
long getChangesRecovered();
/**
* @return number of changes that were applied during recovery phase increased by number of changes
* applied during runtime
*/
long getChangesApplied();
/**
* @return elapsed time in milliseconds since the last change was applied
*/
long getMilliSecondsSinceLastAppliedChange();
/**
* @return elapsed time in milliseconds since the last record was recovered from history
*/
long getMilliSecondsSinceLastRecoveredChange();
/**
* @return String representation of the last applied change
*/
String getLastAppliedChange();
/**
* @return String representation of the last recovered change
*/
String getLastRecoveredChange();
}

View File

@ -0,0 +1,138 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.metrics.Metrics;
import io.debezium.schema.DatabaseSchema;
/**
* Implementation of {@link DatabaseSchema} metrics.
*
* @author Jiri Pechanec
*
*/
public class DatabaseHistoryMetrics extends Metrics implements DatabaseHistoryListener, DatabaseHistoryMXBean {
private static final String CONTEXT_NAME = "schema-history";
private static final Duration PAUSE_BETWEEN_LOG_MESSAGES = Duration.ofSeconds(2);
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHistoryMetrics.class);
public static enum DatabaseHistoryStatus {
STOPPED, RECOVERING, RUNNING
}
private DatabaseHistoryStatus status = DatabaseHistoryStatus.STOPPED;
private Instant recoveryStartTime = null;
private AtomicLong changesRecovered = new AtomicLong();
private AtomicLong totalChangesApplied = new AtomicLong();
private Instant lastChangeAppliedTimestamp;
private Instant lastChangeRecoveredTimestamp;
private HistoryRecord lastAppliedChange;
private HistoryRecord lastRecoveredChange;
protected <T extends CdcSourceTaskContext> DatabaseHistoryMetrics(T taskContext, String contextName) {
super(taskContext, contextName);
}
public DatabaseHistoryMetrics(CommonConnectorConfig connectorConfig) {
super(connectorConfig, CONTEXT_NAME);
}
@Override
public String getStatus() {
return status.toString();
}
@Override
public long getRecoveryStartTime() {
return recoveryStartTime == null ? -1 : recoveryStartTime.getEpochSecond();
}
@Override
public long getChangesRecovered() {
return changesRecovered.get();
}
@Override
public long getChangesApplied() {
return totalChangesApplied.get();
}
@Override
public long getMilliSecondsSinceLastAppliedChange() {
return lastChangeAppliedTimestamp == null ? -1 : Duration.between(lastChangeAppliedTimestamp, Instant.now()).toMillis();
}
@Override
public long getMilliSecondsSinceLastRecoveredChange() {
return lastChangeRecoveredTimestamp == null ? -1 : Duration.between(lastChangeRecoveredTimestamp, Instant.now()).toMillis();
}
@Override
public String getLastAppliedChange() {
return lastAppliedChange == null ? "" : lastAppliedChange.toString();
}
@Override
public String getLastRecoveredChange() {
return lastRecoveredChange == null ? "" : lastRecoveredChange.toString();
}
@Override
public void started() {
status = DatabaseHistoryStatus.RUNNING;
register(LOGGER);
}
@Override
public void stopped() {
status = DatabaseHistoryStatus.STOPPED;
unregister(LOGGER);
}
@Override
public void recoveryStarted() {
status = DatabaseHistoryStatus.RECOVERING;
recoveryStartTime = Instant.now();
}
@Override
public void recoveryStopped() {
status = DatabaseHistoryStatus.RUNNING;
}
@Override
public void onChangeFromHistory(HistoryRecord record) {
lastRecoveredChange = record;
changesRecovered.incrementAndGet();
if (getMilliSecondsSinceLastRecoveredChange() >= PAUSE_BETWEEN_LOG_MESSAGES.toMillis()) {
LOGGER.info("Database history recovery in progress, recovered {} records", changesRecovered);
}
lastChangeRecoveredTimestamp = Instant.now();
}
@Override
public void onChangeApplied(HistoryRecord record) {
lastAppliedChange = record;
totalChangesApplied.incrementAndGet();
if (getMilliSecondsSinceLastAppliedChange() >= PAUSE_BETWEEN_LOG_MESSAGES.toMillis()) {
LOGGER.info("Already applied {} database changes", totalChangesApplied);
}
lastChangeAppliedTimestamp = Instant.now();
}
}

View File

@ -50,7 +50,7 @@ public final class FileDatabaseHistory extends AbstractDatabaseHistory {
private Path path;
@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener) {
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
@ -59,12 +59,13 @@ public void configure(Configuration config, HistoryRecordComparator comparator)
if (running.get()) {
throw new IllegalStateException("Database history file already initialized to " + path);
}
super.configure(config, comparator);
super.configure(config, comparator, listener);
path = Paths.get(config.getString(FILE_PATH));
}
@Override
public void start() {
super.start();
lock.write(() -> {
if (running.compareAndSet(false, true)) {
Path path = this.path;
@ -126,6 +127,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
@Override
public void stop() {
running.set(false);
super.stop();
}
@Override

View File

@ -130,8 +130,8 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
private Duration pollInterval;
@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
super.configure(config, comparator);
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener) {
super.configure(config, comparator, listener);
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}

View File

@ -32,7 +32,7 @@ protected DatabaseHistory createHistory() {
DatabaseHistory history = new FileDatabaseHistory();
history.configure(Configuration.create()
.with(FileDatabaseHistory.FILE_PATH, TEST_FILE_PATH.toAbsolutePath().toString())
.build(), null);
.build(), null, DatabaseHistoryMetrics.NOOP);
history.start();
return history;
}

View File

@ -109,7 +109,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
50000)
.with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)
.build();
history.configure(config, null);
history.configure(config, null, DatabaseHistoryMetrics.NOOP);
history.start();
// Should be able to call start more than once ...
@ -168,7 +168,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
// Stop the history (which should stop the producer) ...
history.stop();
history = new KafkaDatabaseHistory();
history.configure(config, null);
history.configure(config, null, DatabaseHistoryListener.NOOP);
// no need to start
// Recover from the very beginning to just past the first change ...
@ -285,7 +285,7 @@ public void testExists() {
.with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.build();
history.configure(config, null);
history.configure(config, null, DatabaseHistoryMetrics.NOOP);
history.start();
// dummytopic should not exist yet