From 21d2e0b8a98cc5a15d38a39c378c4591cb069bde Mon Sep 17 00:00:00 2001 From: Omar Al-Safi Date: Tue, 20 Mar 2018 17:35:42 +0100 Subject: [PATCH] DBZ-668 Changed the key schema for the heartebat messages from STRING to STRUCT --- .../io/debezium/heartbeat/HeartbeatImpl.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java b/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java index 50c56fb7a..4eb1ebdf2 100644 --- a/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java +++ b/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java @@ -12,6 +12,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +22,7 @@ import io.debezium.util.Clock; import io.debezium.util.Threads; import io.debezium.util.Threads.Timer; +import io.debezium.util.SchemaNameAdjuster; /** * Default implementation of Heartbeat @@ -30,6 +33,7 @@ class HeartbeatImpl implements Heartbeat { private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class); + private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); /** * Default length of interval in which connector generates periodically @@ -42,7 +46,11 @@ class HeartbeatImpl implements Heartbeat { */ static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat"; - private static Schema KEY_SCHEMA = Schema.STRING_SCHEMA; + private static final String SERVER_NAME_KEY = "name"; + private static Schema KEY_SCHEMA = SchemaBuilder.struct() + .name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.ServerNameKey")) + .field(SERVER_NAME_KEY,Schema.STRING_SCHEMA) + .build(); private final String topicName; private final Supplier positionSupplier; @@ -79,6 +87,16 @@ public void heartbeat(BlockingConsumer consumer) throws Interrupte } } + /** + * Produce a key struct for based on the server name and KEY_SCHEMA + * + */ + private Struct serverNameKey(String serverName){ + Struct result = new Struct(KEY_SCHEMA); + result.put(SERVER_NAME_KEY, serverName); + return result; + } + /** * Produce an empty record to the heartbeat topic. * @@ -88,7 +106,7 @@ private SourceRecord heartbeatRecord() { OffsetPosition position = positionSupplier.get(); return new SourceRecord(position.partition(), position.offset(), - topicName, partition, KEY_SCHEMA, key, null, null); + topicName, partition, KEY_SCHEMA, serverNameKey(key), null, null); } private Timer resetHeartbeat() {