DBZ-2329 Do not skip events after restart when stream is not monotonic

This commit is contained in:
Lukasz Korzeniowski 2020-07-21 12:52:25 +02:00 committed by Gunnar Morling
parent c708a89b23
commit 1ada5c4b23
3 changed files with 90 additions and 2 deletions

View File

@ -16,6 +16,7 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -110,6 +111,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
LOGGER.info("Last position recorded in offsets is {}[{}]", lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart);
final AtomicBoolean changesStoppedBeingMonotonic = new AtomicBoolean(false);
TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart;
@ -191,15 +193,28 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
tableWithSmallestLsn.next();
continue;
}
if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) {
LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes");
changesStoppedBeingMonotonic.set(false);
}
// After restart for changes that are not monotonic to avoid data loss
if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) {
LOGGER.info("Disabling skipping changes due to not monotonic order of changes");
changesStoppedBeingMonotonic.set(true);
}
// After restart for changes that were executed before the last committed offset
if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) {
if (!changesStoppedBeingMonotonic.get() &&
tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) {
LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn,
lastProcessedPositionOnStart);
tableWithSmallestLsn.next();
continue;
}
// After restart for change that was the last committed and operations in it before the last committed offset
if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0
if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0
&& eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) {
LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]",
tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart);
@ -380,5 +395,11 @@ protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLExc
return isCompleted() ? TxLogPosition.NULL
: TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)));
}
@Override
protected boolean isNewTransaction() throws SQLException {
return (getPreviousChangePosition() != null) &&
getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0;
}
}
}

View File

@ -605,6 +605,56 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-2329")
public void updatePrimaryKeyTwiceWithRestartInMiddleOfTx() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.MAX_QUEUE_SIZE, 2)
.with(SqlServerConnectorConfig.MAX_BATCH_SIZE, 1)
.with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false)
.build();
// Testing.Print.enable();
// Wait for snapshot completion
start(SqlServerConnector.class, config, record -> {
final Struct envelope = (Struct) record.value();
boolean stop = envelope != null && "d".equals(envelope.get("op")) && (envelope.getStruct("before").getInt32("id") == 305);
return stop;
});
assertConnectorIsRunning();
consumeRecordsByTopic(1);
connection.setAutoCommit(false);
connection.execute("INSERT INTO tableb (id, colb) values (1,'1')");
connection.execute("INSERT INTO tableb (id, colb) values (2,'2')");
connection.execute("INSERT INTO tableb (id, colb) values (3,'3')");
connection.execute("INSERT INTO tableb (id, colb) values (4,'4')");
connection.execute("INSERT INTO tableb (id, colb) values (5,'5')");
consumeRecordsByTopic(5);
connection.execute("UPDATE tableb set id = colb + 300");
connection.execute("UPDATE tableb set id = colb + 300");
final SourceRecords records1 = consumeRecordsByTopic(14);
stopConnector();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
final SourceRecords records2 = consumeRecordsByTopic(6);
final List<SourceRecord> tableB = records1.recordsForTopic("server1.dbo.tableb");
tableB.addAll(records2.recordsForTopic("server1.dbo.tableb"));
Assertions.assertThat(tableB).hasSize(20);
stopConnector();
}
@Test
public void streamChangesWhileStopped() throws Exception {
final int RECORDS_PER_TABLE = 5;

View File

@ -31,6 +31,7 @@ public abstract class ChangeTableResultSet<C extends ChangeTable, T extends Comp
private final int columnDataOffset;
private boolean completed = false;
private T currentChangePosition;
private T previousChangePosition;
public ChangeTableResultSet(C changeTable, ResultSet resultSet, int columnDataOffset) {
this.changeTable = changeTable;
@ -46,12 +47,21 @@ public T getChangePosition() throws SQLException {
return currentChangePosition;
}
protected T getPreviousChangePosition() {
return previousChangePosition;
}
public int getOperation() throws SQLException {
return getOperation(resultSet);
}
public boolean isCurrentPositionSmallerThanPreviousPosition() {
return (previousChangePosition != null) && previousChangePosition.compareTo(currentChangePosition) > 0;
}
public boolean next() throws SQLException {
completed = !resultSet.next();
previousChangePosition = currentChangePosition;
currentChangePosition = getNextChangePosition(resultSet);
if (completed) {
LOGGER.trace("Closing result set of change tables for table {}", changeTable);
@ -100,4 +110,11 @@ public String toString() {
protected abstract int getOperation(ResultSet resultSet) throws SQLException;
protected abstract T getNextChangePosition(ResultSet resultSet) throws SQLException;
/**
* Check whether TX in currentChangePosition is newer (higher) than TX in previousChangePosition
* @return true <=> TX in currentChangePosition > TX in previousChangePosition
* @throws SQLException
*/
protected abstract boolean isNewTransaction() throws SQLException;
}