DBZ-7889: Multiple completed reading from a capture instance notifications

There is a small chance the connector doesn't advance and re-reads
the same LSN range. This happens under the following conditions:
* a new capture instance has been added in the current LSN range;
* while reading CDC changes one of existing capture instances
  dissapears.
The dissapeared capture instance causes an exception which is catched
and processed in `processErrorFromChangeTableQuery`. This leads to
the current connector iteration to be correctly exited without
advancing. On the next iteration the connector starts from the same
LSN as the previous iteration and finds the same new capture instance.
Although `Set` was used to track the list of tables to be removed
`SqlServerChangeTable` doesn't implement `hashCode` so same table
could be added multiple times to the same set.

The fix is to implement `hashCode` and `equals` methods in `ChangeTable`
which is the parent class of `SqlServerChangeTable`.

Additionally a synchronisation block is needed where the tables
are added to the hash map as it happens in a different thread from
the one that removes the tables from the hash map.
This commit is contained in:
Vadzim Ramanenka 2024-01-31 17:49:20 +01:00 committed by Jiri Pechanec
parent 99c9a805b9
commit 2917b78881
2 changed files with 23 additions and 2 deletions

View File

@ -369,8 +369,10 @@ public SqlServerOffsetContext getOffsetContext() {
private void collectChangeTablesWithKnownStopLsn(SqlServerPartition partition, SqlServerChangeTable[] tables) {
for (SqlServerChangeTable table : tables) {
if (table.getStopLsn().isAvailable()) {
LOGGER.info("The stop lsn of {} change table became known", table);
changeTablesWithKnownStopLsn.computeIfAbsent(partition, x -> new HashSet<>()).add(table);
synchronized (changeTablesWithKnownStopLsn) {
LOGGER.info("The stop lsn of {} change table became known", table);
changeTablesWithKnownStopLsn.computeIfAbsent(partition, x -> new HashSet<>()).add(table);
}
}
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.relational;
import java.util.Objects;
/**
* A logical representation of a change table containing changes for a given source table.
* There is usually one change table for each source table. When the schema of the source table is changed,
@ -64,6 +66,23 @@ public void setSourceTable(Table sourceTable) {
this.sourceTable = sourceTable;
}
@Override
public int hashCode() {
return Objects.hash(captureInstance, sourceTableId, changeTableId, changeTableObjectId);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ChangeTable) {
ChangeTable that = (ChangeTable) obj;
return captureInstance.equals(that.captureInstance)
&& sourceTableId.equals(that.sourceTableId)
&& changeTableId.equals(that.changeTableId)
&& changeTableObjectId == that.changeTableObjectId;
}
return false;
}
@Override
public String toString() {
return "ChangeTable{" +