DBZ-220 Refactor heartbeat to a separate class

This commit is contained in:
Jiri Pechanec 2018-01-30 12:21:32 +01:00 committed by Gunnar Morling
parent 4a22e17b71
commit 3c1ea8206b
7 changed files with 179 additions and 84 deletions

View File

@ -7,7 +7,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.BitSet;
import java.util.EnumMap;
import java.util.HashMap;
@ -47,12 +46,13 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.HeartbeatController;
import io.debezium.heartbeat.OffsetPosition;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
/**
* A component that reads the binlog of a MySQL server, and records any schema changes in {@link MySqlSchema}.
@ -86,8 +86,7 @@ public class BinlogReader extends AbstractReader {
private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null;
private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
private Duration heartbeatInterval;
private Timer heartbeatTimeout;
private HeartbeatController heartbeat;
public static class BinlogPosition {
final String filename;
@ -230,9 +229,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// Set up for JMX ...
metrics = new BinlogReaderMetrics(client);
heartbeatInterval = Duration.ofSeconds(context.getBinlogReaderHeartbeatInterval());
heartbeatTimeout = resetHeartbeat();
heartbeat = new HeartbeatController(context.config(), context.topicSelector().getHeartbeatTopic(), () -> OffsetPosition.build(source.partition(), source.offset()));
}
@Override
@ -367,11 +364,7 @@ protected void doCleanup() {
@Override
protected void pollComplete(List<SourceRecord> batch) {
// Generate heartbeat message if the time is right
if (heartbeatTimeout.expired()) {
logger.info("Generating heartbeat event");
recordMakers.heartbeat(batch::add);
heartbeatTimeout = resetHeartbeat();
}
heartbeat.heartbeat(batch::add);
// Record a bit about this batch ...
int batchSize = batch.size();
@ -963,8 +956,4 @@ protected BinaryLogClient getBinlogClient() {
public BinlogPosition getCurrentBinlogPosition() {
return new BinlogPosition(client.getBinlogFilename(), client.getBinlogPosition());
}
private Timer resetHeartbeat() {
return Threads.timer(Clock.SYSTEM, heartbeatInterval.isZero() ? Duration.ofMillis(Long.MAX_VALUE) : heartbeatInterval);
}
}

View File

@ -20,6 +20,7 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.heartbeat.HeartbeatController;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
@ -387,17 +388,6 @@ public static EventProcessingFailureHandlingMode parse(String value) {
*/
private static final int DEFAULT_BINLOG_BUFFER_SIZE = 0;
/**
* Default length of interval in which BinlogReader generates periodically
* heartbeat messages. A size of 0 disables heartbeat.
*/
private static final int DEFAULT_BINLOG_READER_HEARTBEAT_INTERVAL = 0;
/**
* Default prefix for names of heartbeat topics
*/
private static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
public static final Field HOSTNAME = Field.create("database.hostname")
.withDisplayName("Hostname")
.withType(Type.STRING)
@ -694,27 +684,6 @@ public static EventProcessingFailureHandlingMode parse(String value) {
.withDefault(DEFAULT_BINLOG_BUFFER_SIZE)
.withValidation(Field::isNonNegativeInteger);
public static final Field BINLOG_READER_HEARTBEAT_INTERVAL = Field.create("binlog.reader.heartbeat.interval")
.withDisplayName("Binlog reader heartbeat inteval (seconds)")
.withType(Type.INT)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The length of interval in which Binlog reader periodically sends heartbeat messages "
+ "to the Connect. "
+ "Use 0 to disable heartbeat. "
+ "Disabled by default.")
.withDefault(DEFAULT_BINLOG_READER_HEARTBEAT_INTERVAL)
.withValidation(Field::isNonNegativeInteger);
public static final Field HEARTBEAT_TOPICS_PREFIX = Field.create("heartbeat.topics.prefix")
.withDisplayName("A prefix used for naming of heartbeat topics")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The prefix that is used to name heartbeat topics."
+ "Defaults to " + DEFAULT_HEARTBEAT_TOPICS_PREFIX + ".")
.withDefault(DEFAULT_HEARTBEAT_TOPICS_PREFIX);
/**
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go.
@ -864,8 +833,8 @@ public static final Field MASK_COLUMN(int length) {
SERVER_NAME,
CONNECTION_TIMEOUT_MS, KEEP_ALIVE,
MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
BUFFER_SIZE_FOR_BINLOG_READER, BINLOG_READER_HEARTBEAT_INTERVAL,
HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES,
BUFFER_SIZE_FOR_BINLOG_READER, HeartbeatController.HEARTBEAT_INTERVAL,
HeartbeatController.HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES,
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
@ -909,7 +878,7 @@ protected static ConfigDef configDef() {
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER,
BINLOG_READER_HEARTBEAT_INTERVAL, HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE,
HeartbeatController.HEARTBEAT_INTERVAL, HeartbeatController.HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,

View File

@ -15,6 +15,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.function.Predicates;
import io.debezium.heartbeat.HeartbeatController;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
@ -235,12 +236,8 @@ public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
public int getBinlogReaderHeartbeatInterval() {
return config.getInteger(MySqlConnectorConfig.BINLOG_READER_HEARTBEAT_INTERVAL);
}
public String getHeartbeatTopicsPrefix() {
return config.getString(MySqlConnectorConfig.HEARTBEAT_TOPICS_PREFIX);
return config.getString(HeartbeatController.HEARTBEAT_TOPICS_PREFIX);
}
@Override

View File

@ -134,25 +134,6 @@ public int schemaChanges(String databaseName, String ddlStatements, BlockingCons
}
}
/**
* Produce an empty record to heartbeat topic.
*
* @param consumer the consumer for all produced records; may not be null
* @return the number of records produced; will be 0 or 1
*/
public int heartbeat(BlockingConsumer<SourceRecord> consumer) {
String topicName = topicSelector.getHeartbeatTopic();
Integer partition = 0;
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
topicName, partition, null, null, null, null);
try {
consumer.accept(record);
return 1;
} catch (InterruptedException e) {
return 0;
}
}
/**
* Clear all of the cached record makers. This should be done when the logs are rotated, since in that a different table
* numbering scheme will be used by all subsequent TABLE_MAP binlog events.

View File

@ -17,7 +17,7 @@
public interface TopicSelector {
/**
* Get the default topic selector logic, which uses a '.' delimiter character when needed.
*
*
* @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
* {@code delimiter}
* @param heartbeatPrefix the name of the prefix to be used for all heartbeat topics; may not be null and must not terminate in the
@ -30,7 +30,7 @@ static TopicSelector defaultSelector(String prefix, String heartbeatPrefix) {
/**
* Get the default topic selector logic, which uses the supplied delimiter character when needed.
*
*
* @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
* {@code delimiter}
* @param heartbeatPrefix - a prefix that will be used for heartbeat topics. All heartbeat topics will start with this prefix and will use
@ -43,7 +43,7 @@ static TopicSelector defaultSelector(String prefix, String heartbeatPrefix, Stri
/**
* Get the name of the topic for the given server, database, and table names. This method returns
* "{@code <serverName>}".
*
*
* @return the topic name; never null
*/
@Override
@ -54,7 +54,7 @@ public String getPrimaryTopic() {
/**
* Get the name of the topic for the given server name. This method returns
* "{@code <prefix>.<databaseName>.<tableName>}".
*
*
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
@ -67,7 +67,7 @@ public String getTopic(String databaseName, String tableName) {
/**
* Get the name of the heartbeat topic for the given server. This method returns
* "{@code <prefix>-heartbeat}".
*
*
* @return the topic name; never null
*/
@Override
@ -80,7 +80,7 @@ public String getHeartbeatTopic() {
/**
* Get the name of the topic for the given server name.
*
*
* @param tableId the identifier of the table; may not be null
* @return the topic name; never null
*/
@ -90,7 +90,7 @@ default String getTopic(TableId tableId) {
/**
* Get the name of the topic for the given server name.
*
*
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
@ -99,14 +99,14 @@ default String getTopic(TableId tableId) {
/**
* Get the name of the primary topic.
*
*
* @return the topic name; never null
*/
String getPrimaryTopic();
/**
* Get the name of the heartbeat topic.
*
*
* @return the topic name; never null
*/
String getHeartbeatTopic();

View File

@ -0,0 +1,115 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.heartbeat;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.Clock;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
/**
* A class that is able to generate periodic heartbeat messages based on a pre-configured interval. The clients are
* supposed to call method {@link #heartbeat(Consumer)} from a main loop of a connector.
*
* @author Jiri Pechanec
*
*/
public class HeartbeatController {
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class);
/**
* Default length of interval in which connector generates periodically
* heartbeat messages. A size of 0 disables heartbeat.
*/
private static final int DEFAULT_HEARTBEAT_INTERVAL = 0;
/**
* Default prefix for names of heartbeat topics
*/
private static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
public static final Field HEARTBEAT_INTERVAL = Field.create("heartbeat.interval")
.withDisplayName("Conector heartbeat inteval (seconds)")
.withType(Type.INT)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The length of interval in which connector periodically sends heartbeat messages "
+ "to the Connect. "
+ "Use 0 to disable heartbeat. "
+ "Disabled by default.")
.withDefault(DEFAULT_HEARTBEAT_INTERVAL)
.withValidation(Field::isNonNegativeInteger);
public static final Field HEARTBEAT_TOPICS_PREFIX = Field.create("heartbeat.topics.prefix")
.withDisplayName("A prefix used for naming of heartbeat topics")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The prefix that is used to name heartbeat topics."
+ "Defaults to " + DEFAULT_HEARTBEAT_TOPICS_PREFIX + ".")
.withDefault(DEFAULT_HEARTBEAT_TOPICS_PREFIX);
private final String topicName;
private final Supplier<OffsetPosition> positionSupplier;
private final Configuration configuration;
private Duration heartbeatInterval;
private Timer heartbeatTimeout;
public HeartbeatController(Configuration configuration, String topicName,
Supplier<OffsetPosition> positionSupplier) {
super();
this.configuration = configuration;
this.topicName = topicName;
this.positionSupplier = positionSupplier;
heartbeatInterval = Duration.ofSeconds(this.configuration.getInteger(HeartbeatController.HEARTBEAT_INTERVAL));
heartbeatTimeout = resetHeartbeat();
}
/**
* Generates a heartbeat record if defined time has elapsed
*
* @param consumer - a code to place record among others to be sent into Connect
*/
public void heartbeat(Consumer<SourceRecord> consumer) {
if (heartbeatTimeout.expired()) {
LOGGER.info("Generating heartbeat event");
consumer.accept(heartbeatRecord());
heartbeatTimeout = resetHeartbeat();
}
}
/**
* Produce an empty record to heartbeat topic.
*
*/
private SourceRecord heartbeatRecord() {
final Integer partition = 0;
OffsetPosition position = positionSupplier.get();
return new SourceRecord(position.partition(), position.offset(),
topicName, partition, null, null, null, null);
}
private Timer resetHeartbeat() {
return Threads.timer(Clock.SYSTEM, heartbeatInterval.isZero() ? Duration.ofMillis(Long.MAX_VALUE) : heartbeatInterval);
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.heartbeat;
import java.util.Map;
public interface OffsetPosition {
/**
* Get the Kafka Connect detail about the source "partition", which describes the portion of the source that we are
* consuming.
* <p>
* The resulting map is mutable for efficiency reasons (this information rarely changes), but should not be mutated.
*
* @return the source partition information; never null
*/
Map<String, String> partition();
/**
* Get the Kafka Connect detail about the source "offset", which describes the position within the source where we last
* have last read.
*
* @return a copy of the current offset; never null
*/
Map<String, ?> offset();
static OffsetPosition build(Map<String, String> partition, Map<String, ?> offset) {
return new OffsetPosition() {
@Override
public Map<String, String> partition() {
return partition;
}
@Override
public Map<String, ?> offset() {
return offset;
}
};
}
}