DBZ-1152 Misc. clean-up;

* operationOrder -> eventSerialNo
* using long consistently
This commit is contained in:
Gunnar Morling 2019-06-11 11:30:40 +02:00 committed by Jiri Pechanec
parent 87663389ef
commit 3f00e7155f
3 changed files with 27 additions and 20 deletions

View File

@ -21,15 +21,19 @@ public class SqlServerOffsetContext implements OffsetContext {
private static final String SERVER_PARTITION_KEY = "server";
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
private static final String OPERATION_ORDER_KEY = "order";
private static final String EVENT_SERIAL_NO_KEY = "event_serial_no";
private final Schema sourceInfoSchema;
private final SourceInfo sourceInfo;
private final Map<String, String> partition;
private boolean snapshotCompleted;
private int operationOrder;
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, int operationOrder) {
/**
* The index of the current event within the current transaction.
*/
private long eventSerialNo;
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
sourceInfo = new SourceInfo(connectorConfig);
@ -44,7 +48,7 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
else {
sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE);
}
this.operationOrder = operationOrder;
this.eventSerialNo = eventSerialNo;
}
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) {
@ -70,7 +74,7 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString(),
SourceInfo.CHANGE_LSN_KEY,
sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString(),
OPERATION_ORDER_KEY, operationOrder
EVENT_SERIAL_NO_KEY, eventSerialNo
);
}
}
@ -89,16 +93,16 @@ public TxLogPosition getChangePosition() {
return TxLogPosition.valueOf(sourceInfo.getCommitLsn(), sourceInfo.getChangeLsn());
}
public int getOperationOrder() {
return operationOrder;
public long getEventSerialNo() {
return eventSerialNo;
}
public void setChangePosition(TxLogPosition position, int eventCount) {
if (getChangePosition().equals(position)) {
operationOrder += eventCount;
eventSerialNo += eventCount;
}
else {
operationOrder = eventCount;
eventSerialNo = eventCount;
}
sourceInfo.setCommitLsn(position.getCommitLsn());
sourceInfo.setChangeLsn(position.getInTxLsn());
@ -157,11 +161,13 @@ public OffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY));
Long operationOrder = ((Long) offset.get(OPERATION_ORDER_KEY));
if (operationOrder == null) {
operationOrder = Long.valueOf(0);
// only introduced in 0.10.Beta1, so it might be not present when upgrading from earlier versions
Long eventSerialNo = ((Long) offset.get(EVENT_SERIAL_NO_KEY));
if (eventSerialNo == null) {
eventSerialNo = Long.valueOf(0);
}
return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, operationOrder.intValue());
return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo);
}
}
@ -172,6 +178,7 @@ public String toString() {
", sourceInfo=" + sourceInfo +
", partition=" + partition +
", snapshotCompleted=" + snapshotCompleted +
", eventSerialNo=" + eventSerialNo +
"]";
}

View File

@ -91,8 +91,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final AtomicReference<ChangeTable[]> tablesSlot = new AtomicReference<ChangeTable[]>(getCdcTablesToQuery());
final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
final int lastProcessedOrderOnStart = offsetContext.getOperationOrder();
LOGGER.info("Last position recorded in offsets is {}[{}]", lastProcessedPositionOnStart, lastProcessedOrderOnStart);
final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
LOGGER.info("Last position recorded in offsets is {}[{}]", lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart);
TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart;
@ -137,7 +137,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
try {
connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> {
int operationOrderInInitialtx = 1;
long eventSerialNoInInitialTx = 1;
final int tableCount = resultSets.length;
final ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount];
final ChangeTable[] tables = tablesSlot.get();
@ -174,9 +174,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
continue;
}
// After restart for change that was the last committed and operations in it before the last committed offset
if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 && operationOrderInInitialtx <= lastProcessedOrderOnStart) {
LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", tableWithSmallestLsn, operationOrderInInitialtx, lastProcessedPositionOnStart, lastProcessedOrderOnStart);
operationOrderInInitialtx++;
if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) {
LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart);
eventSerialNoInInitialTx++;
tableWithSmallestLsn.next();
continue;
}

View File

@ -205,7 +205,7 @@ private void testStreaming() throws SQLException, InterruptedException {
Assert.assertTrue(record1.sourceOffset().containsKey("change_lsn"));
Assert.assertTrue(record1.sourceOffset().containsKey("commit_lsn"));
Assert.assertTrue(record1.sourceOffset().containsKey("order"));
Assert.assertTrue(record1.sourceOffset().containsKey("event_serial_no"));
assertNull(value1.get("before"));
}
}