DBZ-7183 Support MySQL 8 high resolution replication timestamps from GTID events

This commit is contained in:
Lourens Naude 2023-11-25 22:52:01 +00:00 committed by Chris Cranford
parent baa2ea1e0f
commit ae25b3a11a
3 changed files with 26 additions and 7 deletions

View File

@ -299,6 +299,7 @@ Liu Hanlin
Liu Lang Wa
Liz Chatman
Lokesh Sanapalli
Lourens Naudé
Luca Scannapieco
Luis Garcés-Erice
Lukas Krejci

View File

@ -9,6 +9,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@ -87,6 +88,7 @@ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSour
private long initialEventsToSkip = 0L;
private boolean skipEvent = false;
private boolean ignoreDmlEventByGtidSource = false;
private final boolean isGtidModeEnabled;
private final Predicate<String> gtidDmlSourceFilter;
private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null;
@ -190,6 +192,7 @@ public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, Abs
Configuration configuration = connectorConfig.getConfig();
boolean filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
isGtidModeEnabled = connection.isGtidModeEnabled();
}
protected void onEvent(MySqlOffsetContext offsetContext, Event event) {
@ -212,11 +215,30 @@ protected void onEvent(MySqlOffsetContext offsetContext, Event event) {
return;
}
ts = clock.currentTimeInMillis() - eventTs;
eventTimestamp = getEventTimestamp(event, eventTs);
ts = clock.currentTimeInMillis() - eventTimestamp.toEpochMilli();
LOGGER.trace("Current milliseconds behind source: {} ms", ts);
metrics.setMilliSecondsBehindSource(ts);
}
private Instant getEventTimestamp(Event event, long eventTs) {
// Prefer higher resolution replication timestamps from MySQL 8 GTID events, if possible
if (isGtidModeEnabled) {
if (event.getHeader().getEventType() == EventType.GTID) {
GtidEventData gtidEvent = unwrapData(event);
final long gtidEventTs = gtidEvent.getOriginalCommitTimestamp();
if (gtidEventTs != 0) {
// >= MySQL 8.0.1, prefer the higher resolution replication timestamp
return Instant.EPOCH.plus(gtidEventTs, ChronoUnit.MICROS);
}
}
}
// Fallback to second resolution event timestamps
return Instant.ofEpochMilli(eventTs);
}
protected void ignoreEvent(MySqlOffsetContext offsetContext, Event event) {
LOGGER.trace("Ignoring event due to missing handler: {}", event);
}
@ -228,10 +250,6 @@ protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetCo
}
final EventHeader eventHeader = event.getHeader();
// Update the source offset info. Note that the client returns the value in *milliseconds*, even though the binlog
// contains only *seconds* precision ...
// HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT
eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT) ? Instant.ofEpochMilli(eventHeader.getTimestamp()) : null;
offsetContext.setBinlogServerId(eventHeader.getServerId());
final EventType eventType = eventHeader.getEventType();
@ -869,7 +887,6 @@ public void execute(ChangeEventSourceContext context, MySqlPartition partition,
client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event));
}
final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
metrics.setIsGtidModeEnabled(isGtidModeEnabled);
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.

View File

@ -242,3 +242,4 @@ ahmedrachid,Ahmed Rachid Hazourli
sherpa003,Jiri Kulhanek
slknijnenburg,Sebastiaan Knijnenburg
baabgai,baabgai
methodmissing,Lourens Naudé