DBZ-812 Rename var; guard against NULL row LSN

This commit is contained in:
Jiri Pechanec 2018-11-01 09:40:12 +01:00
parent 8e9c5d9659
commit 144748ed63
2 changed files with 26 additions and 23 deletions

View File

@ -125,52 +125,57 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
for (;;) {
ChangeTablePointer tableSmallestLsn = null;
ChangeTablePointer tableWithSmallestLsn = null;
for (ChangeTablePointer changeTable: changeTables) {
if (changeTable.isCompleted()) {
continue;
}
if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) {
tableSmallestLsn = changeTable;
if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) {
tableWithSmallestLsn = changeTable;
}
}
if (tableSmallestLsn == null) {
if (tableWithSmallestLsn == null) {
// No more LSNs available
break;
}
if (tableSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) {
LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableSmallestLsn, lastProcessedLsnOnStart);
tableSmallestLsn.next();
if (!tableWithSmallestLsn.getRowLsn().isAvailable()) {
LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn);
tableWithSmallestLsn.next();
continue;
}
if (tableSmallestLsn.getChangeTable().getStopLsn().isAvailable() &&
tableSmallestLsn.getChangeTable().getStopLsn().compareTo(tableSmallestLsn.getRowLsn()) <= 0) {
LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableSmallestLsn, tableSmallestLsn.getRowLsn());
tableSmallestLsn.next();
if (tableWithSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) {
LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, lastProcessedLsnOnStart);
tableWithSmallestLsn.next();
continue;
}
LOGGER.trace("Processing change {}", tableSmallestLsn);
if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() &&
tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getRowLsn()) <= 0) {
LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, tableWithSmallestLsn.getRowLsn());
tableWithSmallestLsn.next();
continue;
}
LOGGER.trace("Processing change {}", tableWithSmallestLsn);
if (!schemaChangeCheckpoints.isEmpty()) {
if (tableSmallestLsn.getRowLsn().compareTo(schemaChangeCheckpoints.peek().getStopLsn()) >= 0) {
if (tableWithSmallestLsn.getRowLsn().compareTo(schemaChangeCheckpoints.peek().getStopLsn()) >= 0) {
migrateTable(schemaChangeCheckpoints);
}
}
final TableId tableId = tableSmallestLsn.getChangeTable().getSourceTableId();
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableSmallestLsn.getRowLsn();
final int operation = tableSmallestLsn.getOperation();
final Object[] data = tableSmallestLsn.getData();
final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId();
final Lsn commitLsn = tableWithSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableWithSmallestLsn.getRowLsn();
final int operation = tableWithSmallestLsn.getOperation();
final Object[] data = tableWithSmallestLsn.getData();
// UPDATE consists of two consecutive events, first event contains
// the row before it was updated and the second the row after
// it was updated
if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) {
if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) {
if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) {
throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
}
}
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null;
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null;
offsetContext.setChangeLsn(rowLsn);
offsetContext.setCommitLsn(commitLsn);
@ -188,7 +193,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
clock
)
);
tableSmallestLsn.next();
tableWithSmallestLsn.next();
}
});
lastProcessedLsn = currentMaxLsn;

View File

@ -5,8 +5,6 @@
*/
package io.debezium.connector.sqlserver;
import static org.junit.Assert.assertThat;
import java.sql.SQLException;
import org.apache.kafka.connect.data.Schema;