From 3c1ea8206b0025f8f9c283ac60527f8b98f15103 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 30 Jan 2018 12:21:32 +0100 Subject: [PATCH] DBZ-220 Refactor heartbeat to a separate class --- .../connector/mysql/BinlogReader.java | 21 +--- .../connector/mysql/MySqlConnectorConfig.java | 39 +----- .../connector/mysql/MySqlTaskContext.java | 7 +- .../connector/mysql/RecordMakers.java | 19 --- .../connector/mysql/TopicSelector.java | 18 +-- .../heartbeat/HeartbeatController.java | 115 ++++++++++++++++++ .../io/debezium/heartbeat/OffsetPosition.java | 44 +++++++ 7 files changed, 179 insertions(+), 84 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatController.java create mode 100644 debezium-core/src/main/java/io/debezium/heartbeat/OffsetPosition.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index a8b48bff0..fbad1f702 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.io.Serializable; -import java.time.Duration; import java.util.BitSet; import java.util.EnumMap; import java.util.HashMap; @@ -47,12 +46,13 @@ import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; import io.debezium.connector.mysql.RecordMakers.RecordsForTable; import io.debezium.function.BlockingConsumer; +import io.debezium.heartbeat.HeartbeatController; +import io.debezium.heartbeat.OffsetPosition; import io.debezium.relational.TableId; import io.debezium.util.Clock; import io.debezium.util.ElapsedTimeStrategy; import io.debezium.util.Strings; import io.debezium.util.Threads; -import io.debezium.util.Threads.Timer; /** * A component that reads the binlog of a MySQL server, and records any schema changes in {@link MySqlSchema}. @@ -86,8 +86,7 @@ public class BinlogReader extends AbstractReader { private final AtomicLong totalRecordCounter = new AtomicLong(); private volatile Map lastOffset = null; private com.github.shyiko.mysql.binlog.GtidSet gtidSet; - private Duration heartbeatInterval; - private Timer heartbeatTimeout; + private HeartbeatController heartbeat; public static class BinlogPosition { final String filename; @@ -230,9 +229,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { // Set up for JMX ... metrics = new BinlogReaderMetrics(client); - - heartbeatInterval = Duration.ofSeconds(context.getBinlogReaderHeartbeatInterval()); - heartbeatTimeout = resetHeartbeat(); + heartbeat = new HeartbeatController(context.config(), context.topicSelector().getHeartbeatTopic(), () -> OffsetPosition.build(source.partition(), source.offset())); } @Override @@ -367,11 +364,7 @@ protected void doCleanup() { @Override protected void pollComplete(List batch) { // Generate heartbeat message if the time is right - if (heartbeatTimeout.expired()) { - logger.info("Generating heartbeat event"); - recordMakers.heartbeat(batch::add); - heartbeatTimeout = resetHeartbeat(); - } + heartbeat.heartbeat(batch::add); // Record a bit about this batch ... int batchSize = batch.size(); @@ -963,8 +956,4 @@ protected BinaryLogClient getBinlogClient() { public BinlogPosition getCurrentBinlogPosition() { return new BinlogPosition(client.getBinlogFilename(), client.getBinlogPosition()); } - - private Timer resetHeartbeat() { - return Threads.timer(Clock.SYSTEM, heartbeatInterval.isZero() ? Duration.ofMillis(Long.MAX_VALUE) : heartbeatInterval); - } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 3a09066ba..f39e0c0e5 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -20,6 +20,7 @@ import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; import io.debezium.config.Field.ValidationOutput; +import io.debezium.heartbeat.HeartbeatController; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; @@ -387,17 +388,6 @@ public static EventProcessingFailureHandlingMode parse(String value) { */ private static final int DEFAULT_BINLOG_BUFFER_SIZE = 0; - /** - * Default length of interval in which BinlogReader generates periodically - * heartbeat messages. A size of 0 disables heartbeat. - */ - private static final int DEFAULT_BINLOG_READER_HEARTBEAT_INTERVAL = 0; - - /** - * Default prefix for names of heartbeat topics - */ - private static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat"; - public static final Field HOSTNAME = Field.create("database.hostname") .withDisplayName("Hostname") .withType(Type.STRING) @@ -694,27 +684,6 @@ public static EventProcessingFailureHandlingMode parse(String value) { .withDefault(DEFAULT_BINLOG_BUFFER_SIZE) .withValidation(Field::isNonNegativeInteger); - public static final Field BINLOG_READER_HEARTBEAT_INTERVAL = Field.create("binlog.reader.heartbeat.interval") - .withDisplayName("Binlog reader heartbeat inteval (seconds)") - .withType(Type.INT) - .withWidth(Width.MEDIUM) - .withImportance(Importance.MEDIUM) - .withDescription("The length of interval in which Binlog reader periodically sends heartbeat messages " - + "to the Connect. " - + "Use 0 to disable heartbeat. " - + "Disabled by default.") - .withDefault(DEFAULT_BINLOG_READER_HEARTBEAT_INTERVAL) - .withValidation(Field::isNonNegativeInteger); - - public static final Field HEARTBEAT_TOPICS_PREFIX = Field.create("heartbeat.topics.prefix") - .withDisplayName("A prefix used for naming of heartbeat topics") - .withType(Type.STRING) - .withWidth(Width.MEDIUM) - .withImportance(Importance.LOW) - .withDescription("The prefix that is used to name heartbeat topics." - + "Defaults to " + DEFAULT_HEARTBEAT_TOPICS_PREFIX + ".") - .withDefault(DEFAULT_HEARTBEAT_TOPICS_PREFIX); - /** * The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * and in these situations using Kafka is the only way to go. @@ -864,8 +833,8 @@ public static final Field MASK_COLUMN(int length) { SERVER_NAME, CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS, - BUFFER_SIZE_FOR_BINLOG_READER, BINLOG_READER_HEARTBEAT_INTERVAL, - HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES, + BUFFER_SIZE_FOR_BINLOG_READER, HeartbeatController.HEARTBEAT_INTERVAL, + HeartbeatController.HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES, TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, DATABASE_BLACKLIST, COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, @@ -909,7 +878,7 @@ protected static ConfigDef configDef() { Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST, COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER, - BINLOG_READER_HEARTBEAT_INTERVAL, HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE, + HeartbeatController.HEARTBEAT_INTERVAL, HeartbeatController.HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE); Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index ef2ed4b2b..89d84f69c 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -15,6 +15,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.function.Predicates; +import io.debezium.heartbeat.HeartbeatController; import io.debezium.relational.history.DatabaseHistory; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; @@ -235,12 +236,8 @@ public String getSnapshotSelectOverrides() { return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE); } - public int getBinlogReaderHeartbeatInterval() { - return config.getInteger(MySqlConnectorConfig.BINLOG_READER_HEARTBEAT_INTERVAL); - } - public String getHeartbeatTopicsPrefix() { - return config.getString(MySqlConnectorConfig.HEARTBEAT_TOPICS_PREFIX); + return config.getString(HeartbeatController.HEARTBEAT_TOPICS_PREFIX); } @Override diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java index d429bd146..885e38924 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java @@ -134,25 +134,6 @@ public int schemaChanges(String databaseName, String ddlStatements, BlockingCons } } - /** - * Produce an empty record to heartbeat topic. - * - * @param consumer the consumer for all produced records; may not be null - * @return the number of records produced; will be 0 or 1 - */ - public int heartbeat(BlockingConsumer consumer) { - String topicName = topicSelector.getHeartbeatTopic(); - Integer partition = 0; - SourceRecord record = new SourceRecord(source.partition(), source.offset(), - topicName, partition, null, null, null, null); - try { - consumer.accept(record); - return 1; - } catch (InterruptedException e) { - return 0; - } - } - /** * Clear all of the cached record makers. This should be done when the logs are rotated, since in that a different table * numbering scheme will be used by all subsequent TABLE_MAP binlog events. diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java index 0a0d72d59..55e0de4b3 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java @@ -17,7 +17,7 @@ public interface TopicSelector { /** * Get the default topic selector logic, which uses a '.' delimiter character when needed. - * + * * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the * {@code delimiter} * @param heartbeatPrefix the name of the prefix to be used for all heartbeat topics; may not be null and must not terminate in the @@ -30,7 +30,7 @@ static TopicSelector defaultSelector(String prefix, String heartbeatPrefix) { /** * Get the default topic selector logic, which uses the supplied delimiter character when needed. - * + * * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the * {@code delimiter} * @param heartbeatPrefix - a prefix that will be used for heartbeat topics. All heartbeat topics will start with this prefix and will use @@ -43,7 +43,7 @@ static TopicSelector defaultSelector(String prefix, String heartbeatPrefix, Stri /** * Get the name of the topic for the given server, database, and table names. This method returns * "{@code }". - * + * * @return the topic name; never null */ @Override @@ -54,7 +54,7 @@ public String getPrimaryTopic() { /** * Get the name of the topic for the given server name. This method returns * "{@code ..}". - * + * * @param databaseName the name of the database; may not be null * @param tableName the name of the table; may not be null * @return the topic name; never null @@ -67,7 +67,7 @@ public String getTopic(String databaseName, String tableName) { /** * Get the name of the heartbeat topic for the given server. This method returns * "{@code -heartbeat}". - * + * * @return the topic name; never null */ @Override @@ -80,7 +80,7 @@ public String getHeartbeatTopic() { /** * Get the name of the topic for the given server name. - * + * * @param tableId the identifier of the table; may not be null * @return the topic name; never null */ @@ -90,7 +90,7 @@ default String getTopic(TableId tableId) { /** * Get the name of the topic for the given server name. - * + * * @param databaseName the name of the database; may not be null * @param tableName the name of the table; may not be null * @return the topic name; never null @@ -99,14 +99,14 @@ default String getTopic(TableId tableId) { /** * Get the name of the primary topic. - * + * * @return the topic name; never null */ String getPrimaryTopic(); /** * Get the name of the heartbeat topic. - * + * * @return the topic name; never null */ String getHeartbeatTopic(); diff --git a/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatController.java b/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatController.java new file mode 100644 index 000000000..2f225aac0 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatController.java @@ -0,0 +1,115 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.heartbeat; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.util.Clock; +import io.debezium.util.Threads; +import io.debezium.util.Threads.Timer; + +/** + * A class that is able to generate periodic heartbeat messages based on a pre-configured interval. The clients are + * supposed to call method {@link #heartbeat(Consumer)} from a main loop of a connector. + * + * @author Jiri Pechanec + * + */ +public class HeartbeatController { + + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class); + + /** + * Default length of interval in which connector generates periodically + * heartbeat messages. A size of 0 disables heartbeat. + */ + private static final int DEFAULT_HEARTBEAT_INTERVAL = 0; + + /** + * Default prefix for names of heartbeat topics + */ + private static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat"; + + public static final Field HEARTBEAT_INTERVAL = Field.create("heartbeat.interval") + .withDisplayName("Conector heartbeat inteval (seconds)") + .withType(Type.INT) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .withDescription("The length of interval in which connector periodically sends heartbeat messages " + + "to the Connect. " + + "Use 0 to disable heartbeat. " + + "Disabled by default.") + .withDefault(DEFAULT_HEARTBEAT_INTERVAL) + .withValidation(Field::isNonNegativeInteger); + + public static final Field HEARTBEAT_TOPICS_PREFIX = Field.create("heartbeat.topics.prefix") + .withDisplayName("A prefix used for naming of heartbeat topics") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDescription("The prefix that is used to name heartbeat topics." + + "Defaults to " + DEFAULT_HEARTBEAT_TOPICS_PREFIX + ".") + .withDefault(DEFAULT_HEARTBEAT_TOPICS_PREFIX); + + private final String topicName; + private final Supplier positionSupplier; + private final Configuration configuration; + + private Duration heartbeatInterval; + private Timer heartbeatTimeout; + + public HeartbeatController(Configuration configuration, String topicName, + Supplier positionSupplier) { + super(); + this.configuration = configuration; + this.topicName = topicName; + this.positionSupplier = positionSupplier; + + heartbeatInterval = Duration.ofSeconds(this.configuration.getInteger(HeartbeatController.HEARTBEAT_INTERVAL)); + heartbeatTimeout = resetHeartbeat(); + } + + /** + * Generates a heartbeat record if defined time has elapsed + * + * @param consumer - a code to place record among others to be sent into Connect + */ + public void heartbeat(Consumer consumer) { + if (heartbeatTimeout.expired()) { + LOGGER.info("Generating heartbeat event"); + consumer.accept(heartbeatRecord()); + heartbeatTimeout = resetHeartbeat(); + } + } + + /** + * Produce an empty record to heartbeat topic. + * + */ + private SourceRecord heartbeatRecord() { + final Integer partition = 0; + OffsetPosition position = positionSupplier.get(); + + return new SourceRecord(position.partition(), position.offset(), + topicName, partition, null, null, null, null); + } + + private Timer resetHeartbeat() { + return Threads.timer(Clock.SYSTEM, heartbeatInterval.isZero() ? Duration.ofMillis(Long.MAX_VALUE) : heartbeatInterval); + } + +} \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/heartbeat/OffsetPosition.java b/debezium-core/src/main/java/io/debezium/heartbeat/OffsetPosition.java new file mode 100644 index 000000000..c0e298cd6 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/heartbeat/OffsetPosition.java @@ -0,0 +1,44 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.heartbeat; + +import java.util.Map; + +public interface OffsetPosition { + + /** + * Get the Kafka Connect detail about the source "partition", which describes the portion of the source that we are + * consuming. + *

+ * The resulting map is mutable for efficiency reasons (this information rarely changes), but should not be mutated. + * + * @return the source partition information; never null + */ + Map partition(); + + /** + * Get the Kafka Connect detail about the source "offset", which describes the position within the source where we last + * have last read. + * + * @return a copy of the current offset; never null + */ + Map offset(); + + static OffsetPosition build(Map partition, Map offset) { + return new OffsetPosition() { + + @Override + public Map partition() { + return partition; + } + + @Override + public Map offset() { + return offset; + } + }; + } +} \ No newline at end of file