DBZ-897 Skip messages redelivered from database after restart
This commit is contained in:
parent
738fd28df1
commit
ec8784febb
@ -59,6 +59,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
|||||||
final Metronome metronome = Metronome.sleeper(pollInterval, clock);
|
final Metronome metronome = Metronome.sleeper(pollInterval, clock);
|
||||||
try {
|
try {
|
||||||
final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]);
|
final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]);
|
||||||
|
final Lsn lastProcessedLsnOnStart = offsetContext.getChangeLsn();
|
||||||
|
LOGGER.info("Last LSN recorded in offsets is {}", lastProcessedLsnOnStart);
|
||||||
Lsn lastProcessedLsn = offsetContext.getChangeLsn();
|
Lsn lastProcessedLsn = offsetContext.getChangeLsn();
|
||||||
while (context.isRunning()) {
|
while (context.isRunning()) {
|
||||||
final Lsn currentMaxLsn = connection.getMaxLsn();
|
final Lsn currentMaxLsn = connection.getMaxLsn();
|
||||||
@ -106,6 +108,11 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tableSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) {
|
||||||
|
LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableSmallestLsn, lastProcessedLsnOnStart);
|
||||||
|
tableSmallestLsn.next();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
LOGGER.trace("Processing change {}", tableSmallestLsn);
|
LOGGER.trace("Processing change {}", tableSmallestLsn);
|
||||||
final TableId tableId = tableSmallestLsn.getTableId();
|
final TableId tableId = tableSmallestLsn.getTableId();
|
||||||
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
|
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
|
||||||
|
@ -211,6 +211,7 @@ public void streamChangesWhileStopped() throws Exception {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
|
||||||
stopConnector();
|
stopConnector();
|
||||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||||
final int id = ID_RESTART + i;
|
final int id = ID_RESTART + i;
|
||||||
@ -225,17 +226,13 @@ public void streamChangesWhileStopped() throws Exception {
|
|||||||
start(SqlServerConnector.class, config);
|
start(SqlServerConnector.class, config);
|
||||||
assertConnectorIsRunning();
|
assertConnectorIsRunning();
|
||||||
|
|
||||||
// After restart the last record from transaction is reprocessed, the problem is
|
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
|
||||||
// that the SQL Server seems to return sometimes the last change in transaction even if
|
|
||||||
// the from LSN is after the last change
|
|
||||||
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES + 1);
|
|
||||||
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
|
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
|
||||||
List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
|
List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
|
||||||
if (tableB != null && tableB.size() == RECORDS_PER_TABLE + 1) {
|
|
||||||
tableB = tableB.subList(1, RECORDS_PER_TABLE + 1);
|
|
||||||
}
|
|
||||||
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
|
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
|
||||||
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
|
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
|
||||||
|
|
||||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||||
final int id = i + ID_RESTART;
|
final int id = i + ID_RESTART;
|
||||||
final SourceRecord recordA = tableA.get(i);
|
final SourceRecord recordA = tableA.get(i);
|
||||||
|
Loading…
Reference in New Issue
Block a user