DBZ-60 Added MySQL server ID and timestamp to event's source info

Added to the Debezium event message's `source` information the MySQL server ID for the cluster process that recorded the event and the MySQL timestamp at which the event was recorded.
This commit is contained in:
Randall Hauch 2016-05-20 09:18:54 -05:00
parent a857fc919f
commit 47a93b3ae1
3 changed files with 34 additions and 2 deletions

View File

@ -274,6 +274,8 @@ public List<SourceRecord> poll() throws InterruptedException {
// Update the source offset info ...
EventHeader eventHeader = event.getHeader();
source.setBinlogTimestamp(eventHeader.getTimestamp());
source.setBinlogServerId(eventHeader.getServerId());
EventType eventType = eventHeader.getEventType();
if (eventType == EventType.ROTATE) {
EventData eventData = event.getData();

View File

@ -45,17 +45,22 @@
@NotThreadSafe
final class SourceInfo {
public static final String SERVER_ID_KEY = "server-id";
public static final String SERVER_NAME_KEY = "name";
public static final String SERVER_PARTITION_KEY = "server";
public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
public static final String BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY = "row";
public static final String BINLOG_EVENT_TIMESTAMP_KEY = "ts";
/**
* A {@link Schema} definition for a {@link Struct} used to store the {@link #partition()} and {@link #offset()} information.
*/
public static final Schema SCHEMA = SchemaBuilder.struct()
.name("io.debezium.connector.mysql.Source")
.field(SERVER_PARTITION_KEY, Schema.STRING_SCHEMA)
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(SERVER_ID_KEY, Schema.INT64_SCHEMA)
.field(BINLOG_EVENT_TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.field(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, Schema.INT32_SCHEMA)
@ -65,6 +70,8 @@ final class SourceInfo {
private long binlogPosition = 4;
private int eventRowNumber = 0;
private String serverName;
private long serverId = 0;
private long binlogTs = 0;
private Map<String, String> sourcePartition;
public SourceInfo() {
@ -125,10 +132,12 @@ public Schema schema() {
public Struct struct() {
assert serverName != null;
Struct result = new Struct(SCHEMA);
result.put(SERVER_PARTITION_KEY, serverName);
result.put(SERVER_NAME_KEY, serverName);
result.put(SERVER_ID_KEY, serverId);
result.put(BINLOG_FILENAME_OFFSET_KEY, binlogFilename);
result.put(BINLOG_POSITION_OFFSET_KEY, binlogPosition);
result.put(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, eventRowNumber);
result.put(BINLOG_EVENT_TIMESTAMP_KEY, binlogTs);
return result;
}
@ -172,6 +181,24 @@ public void setRowInEvent(int rowNumber) {
this.eventRowNumber = rowNumber;
}
/**
* Set the server ID as found within the MySQL binary log file.
*
* @param serverId the server ID found within the binary log file
*/
public void setBinlogServerId(long serverId) {
this.serverId = serverId;
}
/**
* Set the timestamp as found within the MySQL binary log file.
*
* @param timestamp the timestamp found within the binary log file
*/
public void setBinlogTimestamp(long timestamp) {
this.binlogTs = timestamp;
}
/**
* Set the source offset, as read from Kafka Connect. This method does nothing if the supplied map is null.
*

View File

@ -166,6 +166,9 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertDelete(updates.get(1), "id", 1001);
assertTombstone(updates.get(2), "id", 1001);
//Testing.Print.enable();
//updates.forEach(this::printJson);
// Stop the connector ...
stopConnector();
}