From 70aa83b371a68ccf49ebb73d2b22197876a6690c Mon Sep 17 00:00:00 2001 From: Lourens Naude Date: Fri, 16 Feb 2024 23:35:05 +0000 Subject: [PATCH] DBZ-7500 Fix MySQL 8 event timestamp resolution logic error where fallback to seconds occurs erroneously for non-GTID events --- .../mysql/MySqlStreamingChangeEventSource.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 52a6320a6..58df1e867 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -219,14 +219,14 @@ protected void onEvent(MySqlOffsetContext offsetContext, Event event) { return; } - eventTimestamp = getEventTimestamp(event, eventTs); + setEventTimestamp(event, eventTs); ts = clock.currentTimeInMillis() - eventTimestamp.toEpochMilli(); LOGGER.trace("Current milliseconds behind source: {} ms", ts); metrics.setMilliSecondsBehindSource(ts); } - private Instant getEventTimestamp(Event event, long eventTs) { + private void setEventTimestamp(Event event, long eventTs) { // Prefer higher resolution replication timestamps from MySQL 8 GTID events, if possible if (isGtidModeEnabled) { if (event.getHeader().getEventType() == EventType.GTID) { @@ -234,13 +234,18 @@ private Instant getEventTimestamp(Event event, long eventTs) { final long gtidEventTs = gtidEvent.getOriginalCommitTimestamp(); if (gtidEventTs != 0) { // >= MySQL 8.0.1, prefer the higher resolution replication timestamp - return Instant.EPOCH.plus(gtidEventTs, ChronoUnit.MICROS); + eventTimestamp = Instant.EPOCH.plus(gtidEventTs, ChronoUnit.MICROS); + } + else { + // Fallback to second resolution event timestamps + eventTimestamp = Instant.ofEpochMilli(eventTs); } } } - - // Fallback to second resolution event timestamps - return Instant.ofEpochMilli(eventTs); + else { + // Fallback to second resolution event timestamps + eventTimestamp = Instant.ofEpochMilli(eventTs); + } } protected void ignoreEvent(MySqlOffsetContext offsetContext, Event event) {