DBZ-5047 Remove setting last snapshot event in MySQL connector

MySQL connector sets last snapshot event twice - the first time during
the streaming and the second time in
`MySqlSnapshotChangeEventSource.postSnapshot()` [1]. Remove setting the
last snapshot event during the streaming and keep it only in one place
in `postSnapshot()` method.

[1] https://github.com/debezium/debezium/blob/v2.0.0.Alpha3/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java#L563
This commit is contained in:
Vojtech Juranek 2022-07-13 19:18:10 +02:00 committed by Jiri Pechanec
parent 109c3d1e0d
commit 5315dbd9cb

View File

@ -243,11 +243,6 @@ protected void releaseDataSnapshotLocks(RelationalSnapshotContext<MySqlPartition
final TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id();
snapshotContext.offset.event(tableId, getClock().currentTime());
if (!i.hasNext()) {
super.lastSnapshotRecord(snapshotContext);
}
dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, tableId, (receiver) -> receiver.schemaChangeEvent(event));
}
@ -540,11 +535,6 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
final TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id();
snapshotContext.offset.event(tableId, getClock().currentTime());
// If data are not snapshotted then the last schema change must set last snapshot flag
if (!snapshottingTask.snapshotData() && !i.hasNext()) {
lastSnapshotRecord(snapshotContext);
}
dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, tableId, (receiver) -> receiver.schemaChangeEvent(event));
}
@ -552,13 +542,6 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
databaseSchema.tableIds().forEach(x -> snapshotContext.tables.overwriteTable(databaseSchema.tableFor(x)));
}
@Override
protected void lastSnapshotRecord(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) {
if (delayedSchemaSnapshotTables.isEmpty()) {
super.lastSnapshotRecord(snapshotContext);
}
}
@Override
protected void postSnapshot() throws InterruptedException {
// We cannot be sure that the last event as the last one