DBZ-3452: source.timestamp.mode=commit imposes a significant performance penalty

Fetch LSN's timestamps along with the CDC data.
This commit is contained in:
Vadzim Ramanenka 2021-04-21 14:25:34 +03:00 committed by Gunnar Morling
parent ab2c3f41e6
commit 7adfaba483
4 changed files with 101 additions and 89 deletions

View File

@ -5,9 +5,16 @@
*/
package io.debezium.connector.sqlserver;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.debezium.config.EnumeratedValue;
import io.debezium.util.Clock;
/**
* Strategy for populating the source.ts_ms field in change events.
@ -17,12 +24,44 @@ public enum SourceTimestampMode implements EnumeratedValue {
/**
* This mode (default) will set the source timestamp field (ts_ms) of when the record was committed in the database.
*/
COMMIT("commit"),
COMMIT("commit") {
@Override
protected Instant getTimestamp(SqlServerConnection connection, Clock clock, ResultSet resultSet) throws SQLException {
return connection.normalize(resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()));
}
/**
* Returns the query for obtaining the LSN-to-TIMESTAMP query. On SQL Server
* 2016 and newer, the query will normalize the value to UTC. This means that
* the {@link SqlServerConnection#SERVER_TIMEZONE_PROP_NAME} is not necessary to be given. The
* returned TIMESTAMP will be adjusted by the JDBC driver using this VM's TZ (as
* required by the JDBC spec), and that same TZ will be applied when converting
* the TIMESTAMP value into an {@code Instant}.
*/
@Override
protected String lsnTimestampSelectStatement(boolean supportsAtTimeZone) {
String result = ", " + SqlServerConnection.LSN_TIMESTAMP_SELECT_STATEMENT;
if (supportsAtTimeZone) {
result += " " + SqlServerConnection.AT_TIME_ZONE_UTC;
}
return result;
}
},
/**
* This mode will set the source timestamp field (ts_ms) of when the record was processed by Debezium.
*/
PROCESSING("processing");
PROCESSING("processing") {
@Override
protected Instant getTimestamp(SqlServerConnection connection, Clock clock, ResultSet resultSet) {
return clock.currentTime();
}
@Override
protected String lsnTimestampSelectStatement(boolean supportsAtTimeZone) {
return "";
}
};
private final String value;
@ -35,6 +74,41 @@ public String getValue() {
return value;
}
/**
* Returns the timestamp to be put in the source metadata of the event depending on the mode.
*
* @param connection Server connection used to fetch the result set
* @param clock System clock to source processing time from
* @param resultSet Result set representing the CDC event and its commit timestamp, if required by the mode
*/
protected abstract Instant getTimestamp(SqlServerConnection connection, Clock clock, ResultSet resultSet) throws SQLException;
/**
* Returns the SQL fragment to be embedded into the {@code GET_ALL_CHANGES_FOR_TABLE} query depending on the mode.
*
* @param supportsAtTimeZone Whether the server supports the {@code AT TIME ZONE} clause
*/
protected abstract String lsnTimestampSelectStatement(boolean supportsAtTimeZone);
/**
* Returns the names of the data columns returned by the {@code GET_ALL_CHANGES_FOR_TABLE} query.
*
* @param rsmd Result set metadata
* @param columnDataOffset Offset of the first data column in the result set
*/
protected List<String> getResultColumnNames(ResultSetMetaData rsmd, int columnDataOffset) throws SQLException {
int columnCount = rsmd.getColumnCount() - (columnDataOffset - 1);
if (equals(COMMIT)) {
// the last column in the {@code COMMIT} is the commit timestamp
columnCount -= 1;
}
final List<String> columns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
columns.add(rsmd.getColumnName(columnDataOffset + i));
}
return columns;
}
public static SourceTimestampMode getDefaultMode() {
return COMMIT;
}

View File

@ -8,7 +8,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -46,17 +45,19 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private ResultSetMapper<Object[]> resultSetMapper;
private final ResultSet resultSet;
private final SourceTimestampMode sourceTimestampMode;
private final int columnDataOffset;
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) {
this(changeTable, resultSet, COL_DATA);
}
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet, int columnDataOffset) {
super(changeTable, resultSet, columnDataOffset);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet, SourceTimestampMode sourceTimestampMode) {
super(changeTable, resultSet, COL_DATA);
// Store references to these because we can't get them from our superclass
this.resultSet = resultSet;
this.columnDataOffset = columnDataOffset;
this.columnDataOffset = COL_DATA;
this.sourceTimestampMode = sourceTimestampMode;
}
protected ResultSet getResultSet() {
return resultSet;
}
@Override
@ -109,7 +110,8 @@ public Object[] getData() throws SQLException {
*/
private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLException {
ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table);
final List<String> resultColumns = getResultColumnNames();
final List<String> resultColumns = sourceTimestampMode.getResultColumnNames(
resultSet.getMetaData(), columnDataOffset);
final int resultColumnCount = resultColumns.size();
final IndicesMapping indicesMapping = new IndicesMapping(columnMap.getSourceTableColumns(), resultColumns);
@ -127,15 +129,6 @@ private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLE
};
}
private List<String> getResultColumnNames() throws SQLException {
final int columnCount = resultSet.getMetaData().getColumnCount() - (columnDataOffset - 1);
final List<String> columns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
columns.add(resultSet.getMetaData().getColumnName(columnDataOffset + i));
}
return columns;
}
private class IndicesMapping {
private final Map<Integer, Integer> mapping;

View File

@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -40,7 +41,6 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.BoundedConcurrentHashMap;
import io.debezium.util.Clock;
/**
@ -66,9 +66,10 @@ public class SqlServerConnection extends JdbcConnection {
private static final String GET_MIN_LSN = "SELECT sys.fn_cdc_get_min_lsn('#')";
private static final String LOCK_TABLE = "SELECT * FROM [#] WITH (TABLOCKX)";
private static final String SQL_SERVER_VERSION = "SELECT @@VERSION AS 'SQL Server Version'";
private final String lsnToTimestamp;
private static final String INCREMENT_LSN = "SELECT sys.fn_cdc_increment_lsn(?)";
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT *# FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "sys.fn_cdc_map_lsn_to_time([__$start_lsn])";
protected static final String AT_TIME_ZONE_UTC = "AT TIME ZONE 'UTC'";
private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "SELECT * FROM cdc.change_tables WHERE start_lsn BETWEEN ? AND ?";
private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT * FROM cdc.index_columns WHERE object_id=?";
@ -88,11 +89,9 @@ public class SqlServerConnection extends JdbcConnection {
*/
private final String realDatabaseName;
private final ZoneId transactionTimezone;
private final SourceTimestampMode sourceTimestampMode;
private final Clock clock;
private final String getAllChangesForTable;
private final int queryFetchSize;
private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;
private final SqlServerDefaultValueConverter defaultValueConverter;
/**
@ -119,13 +118,11 @@ public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMod
public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode, SqlServerValueConverters valueConverters,
Supplier<ClassLoader> classLoaderSupplier) {
super(config, FACTORY, classLoaderSupplier);
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();
boolean supportsAtTimeZone = supportsAtTimeZone();
transactionTimezone = retrieveTransactionTimezone(supportsAtTimeZone);
lsnToTimestamp = getLsnToTimestamp(supportsAtTimeZone);
this.clock = clock;
this.sourceTimestampMode = sourceTimestampMode;
getAllChangesForTable = GET_ALL_CHANGES_FOR_TABLE.replaceFirst(STATEMENTS_PLACEHOLDER,
Matcher.quoteReplacement(sourceTimestampMode.lsnTimestampSelectStatement(supportsAtTimeZone)));
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
this.queryFetchSize = config().getInteger(CommonConnectorConfig.QUERY_FETCH_SIZE);
}
@ -139,24 +136,6 @@ public String connectionString() {
return connectionString(URL_PATTERN);
}
/**
* Returns the query for obtaining the LSN-to-TIMESTAMP query. On SQL Server
* 2016 and newer, the query will normalize the value to UTC. This means that
* the {@link #SERVER_TIMEZONE_PROP_NAME} is not necessary to be given. The
* returned TIMESTAMP will be adjusted by the JDBC driver using this VM's TZ (as
* required by the JDBC spec), and that same TZ will be applied when converting
* the TIMESTAMP value into an {@code Instant}.
*/
private static String getLsnToTimestamp(boolean supportsAtTimeZone) {
String lsnToTimestamp = "SELECT sys.fn_cdc_map_lsn_to_time(?)";
if (supportsAtTimeZone) {
lsnToTimestamp = lsnToTimestamp + " AT TIME ZONE 'UTC'";
}
return lsnToTimestamp;
}
/**
* @return the current largest log sequence number
*/
@ -208,7 +187,7 @@ public Lsn getMinLsn(String changeTableName) throws SQLException {
* @throws SQLException
*/
public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException {
final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId));
final String query = getAllChangesForTable.replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId));
prepareQuery(query, statement -> {
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, toLsn.getBinary());
@ -231,7 +210,7 @@ public void getChangesForTables(SqlServerChangeTable[] changeTables, Lsn interva
int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
final String query = getAllChangesForTable.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
@ -273,43 +252,7 @@ public Lsn incrementLsn(Lsn lsn) throws SQLException {
}, "Increment LSN query must return exactly one value"));
}
/**
* Map a commit LSN to a point in time when the commit happened.
*
* @param lsn - LSN of the commit
* @return time when the commit was recorded into the database log or the
* current time, depending on the setting for the "source timestamp
* mode" option
* @throws SQLException
*/
public Instant timestampOfLsn(Lsn lsn) throws SQLException {
if (SourceTimestampMode.PROCESSING.equals(sourceTimestampMode)) {
return clock.currentTime();
}
if (lsn.getBinary() == null) {
return null;
}
Instant cachedInstant = lsnToInstantCache.get(lsn);
if (cachedInstant != null) {
return cachedInstant;
}
return prepareQueryAndMap(lsnToTimestamp, statement -> {
statement.setBytes(1, lsn.getBinary());
}, singleResultMapper(rs -> {
final Timestamp ts = rs.getTimestamp(1);
Instant ret = (ts == null) ? null : normalize(ts);
LOGGER.trace("Timestamp of lsn {} is {}", lsn, ret);
if (ret != null) {
lsnToInstantCache.put(lsn, ret);
}
return ret;
}, "LSN to timestamp query must return exactly one value"));
}
private Instant normalize(Timestamp timestamp) {
protected Instant normalize(Timestamp timestamp) {
Instant instant = timestamp.toInstant();
// in case the incoming timestamp was not based on UTC, shift it as per the

View File

@ -169,7 +169,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final SqlServerChangeTable[] tables = tablesSlot.get();
for (int i = 0; i < tableCount; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i],
connectorConfig.getSourceTimestampMode());
changeTables[i].next();
}
@ -256,7 +257,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount);
offsetContext.event(
tableWithSmallestLsn.getChangeTable().getSourceTableId(),
metadataConnection.timestampOfLsn(tableWithSmallestLsn.getChangePosition().getCommitLsn()));
connectorConfig.getSourceTimestampMode().getTimestamp(
metadataConnection, clock, tableWithSmallestLsn.getResultSet()));
dispatcher
.dispatchDataChangeEvent(