diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index b45d5e334..5206f55cb 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -16,6 +16,7 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -110,6 +111,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition(); final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo(); LOGGER.info("Last position recorded in offsets is {}[{}]", lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); + final AtomicBoolean changesStoppedBeingMonotonic = new AtomicBoolean(false); TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart; @@ -191,15 +193,28 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio tableWithSmallestLsn.next(); continue; } + + if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { + LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); + changesStoppedBeingMonotonic.set(false); + } + + // After restart for changes that are not monotonic to avoid data loss + if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { + LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); + changesStoppedBeingMonotonic.set(true); + } + // After restart for changes that were executed before the last committed offset - if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { + if (!changesStoppedBeingMonotonic.get() && + tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, lastProcessedPositionOnStart); tableWithSmallestLsn.next(); continue; } // After restart for change that was the last committed and operations in it before the last committed offset - if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 + if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); @@ -380,5 +395,11 @@ protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLExc return isCompleted() ? TxLogPosition.NULL : TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN))); } + + @Override + protected boolean isNewTransaction() throws SQLException { + return (getPreviousChangePosition() != null) && + getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0; + } } } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 8d4cf0d71..cffebf826 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -605,6 +605,56 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { stopConnector(); } + @Test + @FixFor("DBZ-2329") + public void updatePrimaryKeyTwiceWithRestartInMiddleOfTx() throws Exception { + + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.MAX_QUEUE_SIZE, 2) + .with(SqlServerConnectorConfig.MAX_BATCH_SIZE, 1) + .with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false) + .build(); + + // Testing.Print.enable(); + // Wait for snapshot completion + start(SqlServerConnector.class, config, record -> { + final Struct envelope = (Struct) record.value(); + boolean stop = envelope != null && "d".equals(envelope.get("op")) && (envelope.getStruct("before").getInt32("id") == 305); + return stop; + }); + assertConnectorIsRunning(); + + consumeRecordsByTopic(1); + + connection.setAutoCommit(false); + + connection.execute("INSERT INTO tableb (id, colb) values (1,'1')"); + connection.execute("INSERT INTO tableb (id, colb) values (2,'2')"); + connection.execute("INSERT INTO tableb (id, colb) values (3,'3')"); + connection.execute("INSERT INTO tableb (id, colb) values (4,'4')"); + connection.execute("INSERT INTO tableb (id, colb) values (5,'5')"); + consumeRecordsByTopic(5); + + connection.execute("UPDATE tableb set id = colb + 300"); + connection.execute("UPDATE tableb set id = colb + 300"); + + final SourceRecords records1 = consumeRecordsByTopic(14); + + stopConnector(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + final SourceRecords records2 = consumeRecordsByTopic(6); + + final List tableB = records1.recordsForTopic("server1.dbo.tableb"); + tableB.addAll(records2.recordsForTopic("server1.dbo.tableb")); + + Assertions.assertThat(tableB).hasSize(20); + + stopConnector(); + } + @Test public void streamChangesWhileStopped() throws Exception { final int RECORDS_PER_TABLE = 5; diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index 6d5892973..81bfb08f0 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -31,6 +31,7 @@ public abstract class ChangeTableResultSet 0; + } + public boolean next() throws SQLException { completed = !resultSet.next(); + previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { LOGGER.trace("Closing result set of change tables for table {}", changeTable); @@ -100,4 +110,11 @@ public String toString() { protected abstract int getOperation(ResultSet resultSet) throws SQLException; protected abstract T getNextChangePosition(ResultSet resultSet) throws SQLException; + + /** + * Check whether TX in currentChangePosition is newer (higher) than TX in previousChangePosition + * @return true <=> TX in currentChangePosition > TX in previousChangePosition + * @throws SQLException + */ + protected abstract boolean isNewTransaction() throws SQLException; }