From 5315dbd9cbe6898faebdc57f316d74d42edc008c Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Wed, 13 Jul 2022 19:18:10 +0200 Subject: [PATCH] DBZ-5047 Remove setting last snapshot event in MySQL connector MySQL connector sets last snapshot event twice - the first time during the streaming and the second time in `MySqlSnapshotChangeEventSource.postSnapshot()` [1]. Remove setting the last snapshot event during the streaming and keep it only in one place in `postSnapshot()` method. [1] https://github.com/debezium/debezium/blob/v2.0.0.Alpha3/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java#L563 --- .../mysql/MySqlSnapshotChangeEventSource.java | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index b6c3b8a78..25123c96a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -243,11 +243,6 @@ protected void releaseDataSnapshotLocks(RelationalSnapshotContext receiver.schemaChangeEvent(event)); } @@ -540,11 +535,6 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source final TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id(); snapshotContext.offset.event(tableId, getClock().currentTime()); - - // If data are not snapshotted then the last schema change must set last snapshot flag - if (!snapshottingTask.snapshotData() && !i.hasNext()) { - lastSnapshotRecord(snapshotContext); - } dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, tableId, (receiver) -> receiver.schemaChangeEvent(event)); } @@ -552,13 +542,6 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source databaseSchema.tableIds().forEach(x -> snapshotContext.tables.overwriteTable(databaseSchema.tableFor(x))); } - @Override - protected void lastSnapshotRecord(RelationalSnapshotContext snapshotContext) { - if (delayedSchemaSnapshotTables.isEmpty()) { - super.lastSnapshotRecord(snapshotContext); - } - } - @Override protected void postSnapshot() throws InterruptedException { // We cannot be sure that the last event as the last one