DBZ-668 Changed the key schema for the heartebat messages from STRING to STRUCT

This commit is contained in:
Omar Al-Safi 2018-03-20 17:35:42 +01:00 committed by Gunnar Morling
parent 0743467102
commit 21d2e0b8a9

View File

@ -12,6 +12,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,6 +22,7 @@
import io.debezium.util.Clock;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
import io.debezium.util.SchemaNameAdjuster;
/**
* Default implementation of Heartbeat
@ -30,6 +33,7 @@
class HeartbeatImpl implements Heartbeat {
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
/**
* Default length of interval in which connector generates periodically
@ -42,7 +46,11 @@ class HeartbeatImpl implements Heartbeat {
*/
static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
private static Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
private static final String SERVER_NAME_KEY = "name";
private static Schema KEY_SCHEMA = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.ServerNameKey"))
.field(SERVER_NAME_KEY,Schema.STRING_SCHEMA)
.build();
private final String topicName;
private final Supplier<OffsetPosition> positionSupplier;
@ -79,6 +87,16 @@ public void heartbeat(BlockingConsumer<SourceRecord> consumer) throws Interrupte
}
}
/**
* Produce a key struct for based on the server name and KEY_SCHEMA
*
*/
private Struct serverNameKey(String serverName){
Struct result = new Struct(KEY_SCHEMA);
result.put(SERVER_NAME_KEY, serverName);
return result;
}
/**
* Produce an empty record to the heartbeat topic.
*
@ -88,7 +106,7 @@ private SourceRecord heartbeatRecord() {
OffsetPosition position = positionSupplier.get();
return new SourceRecord(position.partition(), position.offset(),
topicName, partition, KEY_SCHEMA, key, null, null);
topicName, partition, KEY_SCHEMA, serverNameKey(key), null, null);
}
private Timer resetHeartbeat() {