diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index d426f9783..09fb80908 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -299,6 +299,7 @@ Liu Hanlin Liu Lang Wa Liz Chatman Lokesh Sanapalli +Lourens Naudé Luca Scannapieco Luis Garcés-Erice Lukas Krejci diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 8fcfbfad5..aa02b39c0 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -9,6 +9,7 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -87,6 +88,7 @@ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSour private long initialEventsToSkip = 0L; private boolean skipEvent = false; private boolean ignoreDmlEventByGtidSource = false; + private final boolean isGtidModeEnabled; private final Predicate gtidDmlSourceFilter; private final AtomicLong totalRecordCounter = new AtomicLong(); private volatile Map lastOffset = null; @@ -190,6 +192,7 @@ public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, Abs Configuration configuration = connectorConfig.getConfig(); boolean filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS); gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null; + isGtidModeEnabled = connection.isGtidModeEnabled(); } protected void onEvent(MySqlOffsetContext offsetContext, Event event) { @@ -212,11 +215,30 @@ protected void onEvent(MySqlOffsetContext offsetContext, Event event) { return; } - ts = clock.currentTimeInMillis() - eventTs; + eventTimestamp = getEventTimestamp(event, eventTs); + + ts = clock.currentTimeInMillis() - eventTimestamp.toEpochMilli(); LOGGER.trace("Current milliseconds behind source: {} ms", ts); metrics.setMilliSecondsBehindSource(ts); } + private Instant getEventTimestamp(Event event, long eventTs) { + // Prefer higher resolution replication timestamps from MySQL 8 GTID events, if possible + if (isGtidModeEnabled) { + if (event.getHeader().getEventType() == EventType.GTID) { + GtidEventData gtidEvent = unwrapData(event); + final long gtidEventTs = gtidEvent.getOriginalCommitTimestamp(); + if (gtidEventTs != 0) { + // >= MySQL 8.0.1, prefer the higher resolution replication timestamp + return Instant.EPOCH.plus(gtidEventTs, ChronoUnit.MICROS); + } + } + } + + // Fallback to second resolution event timestamps + return Instant.ofEpochMilli(eventTs); + } + protected void ignoreEvent(MySqlOffsetContext offsetContext, Event event) { LOGGER.trace("Ignoring event due to missing handler: {}", event); } @@ -228,10 +250,6 @@ protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetCo } final EventHeader eventHeader = event.getHeader(); - // Update the source offset info. Note that the client returns the value in *milliseconds*, even though the binlog - // contains only *seconds* precision ... - // HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT - eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT) ? Instant.ofEpochMilli(eventHeader.getTimestamp()) : null; offsetContext.setBinlogServerId(eventHeader.getServerId()); final EventType eventType = eventHeader.getEventType(); @@ -869,7 +887,6 @@ public void execute(ChangeEventSourceContext context, MySqlPartition partition, client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event)); } - final boolean isGtidModeEnabled = connection.isGtidModeEnabled(); metrics.setIsGtidModeEnabled(isGtidModeEnabled); // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint. diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index 18e6a6d24..17198aabd 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -241,4 +241,5 @@ Lars M Johansson,Lars M. Johansson ahmedrachid,Ahmed Rachid Hazourli sherpa003,Jiri Kulhanek slknijnenburg,Sebastiaan Knijnenburg -baabgai,baabgai \ No newline at end of file +baabgai,baabgai +methodmissing,Lourens Naudé