diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java index 36cac33fa..703f105dd 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java @@ -211,6 +211,7 @@ public void setBinlogStartPoint(String binlogFilename, long positionOfFirstEvent this.restartBinlogPosition = positionOfFirstEvent; this.currentRowNumber = 0; this.restartRowsToSkip = 0; + this.restartEventsToSkip = 0; } /** @@ -224,6 +225,8 @@ public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes) this.currentEventLengthInBytes = eventSizeInBytes; if (!inTransaction) { this.restartBinlogPosition = positionOfCurrentEvent + eventSizeInBytes; + this.restartRowsToSkip = 0; + this.restartEventsToSkip = 0; } // Don't set anything else, since the row numbers are set in the offset(int,int) method called at least once // for each processed event diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 2567bd864..6c35a1181 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -728,8 +728,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio } else { // the replica is not the same server as the master, so it will have a different binlog filename and position ... } - // Event number is 2 ... - assertThat(persistedOffsetSource.eventsToSkipUponRestart()).isEqualTo(2); + // Last event is 'SHOW MASTER STATUS' which will reset the event number to 0 ... + assertThat(persistedOffsetSource.eventsToSkipUponRestart()).isEqualTo(0); // GTID set should match the before-inserts GTID set ... // assertThat(persistedOffsetSource.gtidSet()).isEqualTo(positionBeforeInserts.gtidSet());