diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index 3c4a81ede..038b5f501 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -21,15 +21,19 @@ public class SqlServerOffsetContext implements OffsetContext { private static final String SERVER_PARTITION_KEY = "server"; private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; - private static final String OPERATION_ORDER_KEY = "order"; + private static final String EVENT_SERIAL_NO_KEY = "event_serial_no"; private final Schema sourceInfoSchema; private final SourceInfo sourceInfo; private final Map partition; private boolean snapshotCompleted; - private int operationOrder; - public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, int operationOrder) { + /** + * The index of the current event within the current transaction. + */ + private long eventSerialNo; + + public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); sourceInfo = new SourceInfo(connectorConfig); @@ -44,7 +48,7 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos else { sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); } - this.operationOrder = operationOrder; + this.eventSerialNo = eventSerialNo; } public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) { @@ -70,7 +74,7 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString(), SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString(), - OPERATION_ORDER_KEY, operationOrder + EVENT_SERIAL_NO_KEY, eventSerialNo ); } } @@ -89,16 +93,16 @@ public TxLogPosition getChangePosition() { return TxLogPosition.valueOf(sourceInfo.getCommitLsn(), sourceInfo.getChangeLsn()); } - public int getOperationOrder() { - return operationOrder; + public long getEventSerialNo() { + return eventSerialNo; } public void setChangePosition(TxLogPosition position, int eventCount) { if (getChangePosition().equals(position)) { - operationOrder += eventCount; + eventSerialNo += eventCount; } else { - operationOrder = eventCount; + eventSerialNo = eventCount; } sourceInfo.setCommitLsn(position.getCommitLsn()); sourceInfo.setChangeLsn(position.getInTxLsn()); @@ -157,11 +161,13 @@ public OffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); - Long operationOrder = ((Long) offset.get(OPERATION_ORDER_KEY)); - if (operationOrder == null) { - operationOrder = Long.valueOf(0); + // only introduced in 0.10.Beta1, so it might be not present when upgrading from earlier versions + Long eventSerialNo = ((Long) offset.get(EVENT_SERIAL_NO_KEY)); + if (eventSerialNo == null) { + eventSerialNo = Long.valueOf(0); } - return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, operationOrder.intValue()); + + return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo); } } @@ -172,6 +178,7 @@ public String toString() { ", sourceInfo=" + sourceInfo + ", partition=" + partition + ", snapshotCompleted=" + snapshotCompleted + + ", eventSerialNo=" + eventSerialNo + "]"; } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index bcabebe5c..c55277478 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -91,8 +91,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio final AtomicReference tablesSlot = new AtomicReference(getCdcTablesToQuery()); final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition(); - final int lastProcessedOrderOnStart = offsetContext.getOperationOrder(); - LOGGER.info("Last position recorded in offsets is {}[{}]", lastProcessedPositionOnStart, lastProcessedOrderOnStart); + final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo(); + LOGGER.info("Last position recorded in offsets is {}[{}]", lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart; @@ -137,7 +137,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio try { connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> { - int operationOrderInInitialtx = 1; + long eventSerialNoInInitialTx = 1; final int tableCount = resultSets.length; final ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount]; final ChangeTable[] tables = tablesSlot.get(); @@ -174,9 +174,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio continue; } // After restart for change that was the last committed and operations in it before the last committed offset - if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 && operationOrderInInitialtx <= lastProcessedOrderOnStart) { - LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", tableWithSmallestLsn, operationOrderInInitialtx, lastProcessedPositionOnStart, lastProcessedOrderOnStart); - operationOrderInInitialtx++; + if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { + LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); + eventSerialNoInInitialTx++; tableWithSmallestLsn.next(); continue; } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index 7d46b1134..bc46cbce2 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -205,7 +205,7 @@ private void testStreaming() throws SQLException, InterruptedException { Assert.assertTrue(record1.sourceOffset().containsKey("change_lsn")); Assert.assertTrue(record1.sourceOffset().containsKey("commit_lsn")); - Assert.assertTrue(record1.sourceOffset().containsKey("order")); + Assert.assertTrue(record1.sourceOffset().containsKey("event_serial_no")); assertNull(value1.get("before")); } }