DBZ-2120 move check null logic in query to java code

This commit is contained in:
linhnh 2020-06-12 12:08:01 +07:00 committed by Jiri Pechanec
parent d4cefaa1c2
commit 39ccfeb013

View File

@ -51,11 +51,12 @@ public class SqlServerConnection extends JdbcConnection {
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
private static final String LOCK_TABLE = "SELECT * FROM [#] WITH (TABLOCKX)";
private static final String SQL_SERVER_VERSION = "SELECT @@VERSION AS 'SQL Server Version'";
private final String lsnToTimestamp;
private static final String INCREMENT_LSN = "SELECT sys.fn_cdc_increment_lsn(?)";
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](ISNULL(?,sys.fn_cdc_get_min_lsn('#')), ?, N'all update old')";
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')";
private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "SELECT * FROM cdc.change_tables WHERE start_lsn BETWEEN ? AND ?";
private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT * FROM cdc.index_columns WHERE object_id=?";
@ -128,6 +129,18 @@ public Lsn getMaxLsn() throws SQLException {
}, "Maximum LSN query must return exactly one value"));
}
/**
* @return the smallest log sequence number of table
*/
public Lsn getMinLsn(String changeTableName) throws SQLException {
String query = GET_MIN_LSN.replace(STATEMENTS_PLACEHOLDER, changeTableName);
return queryAndMap(query, singleResultMapper(rs -> {
final Lsn ret = Lsn.valueOf(rs.getBytes(1));
LOGGER.trace("Current minimum lsn is {}", ret);
return ret;
}, "Minimum LSN query must return exactly one value"));
}
/**
* Provides all changes recorded by the SQL Server CDC capture process for a given table.
*
@ -165,7 +178,7 @@ public void getChangesForTables(SqlServerChangeTable[] changeTables, Lsn interva
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
statement.setBytes(1, fromLsn.getBinary());
@ -177,6 +190,11 @@ public void getChangesForTables(SqlServerChangeTable[] changeTables, Lsn interva
prepareQuery(queries, preparers, consumer);
}
private Lsn getFromLsn(ChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getCaptureInstance());
}
/**
* Obtain the next available position in the database log.
*