DBZ-1078 Cache sys.fn_cdc_map_lsn_to_time() results
This commit is contained in:
parent
40040e88b2
commit
efc5ba2e0e
@ -30,6 +30,7 @@
|
||||
import io.debezium.relational.ColumnEditor;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.BoundedConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* {@link JdbcConnection} extension to be used with Microsoft SQL Server
|
||||
@ -62,6 +63,8 @@ private static interface ResultSetExtractor<T> {
|
||||
T apply(ResultSet rs) throws SQLException;
|
||||
}
|
||||
|
||||
private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;
|
||||
|
||||
/**
|
||||
* Creates a new connection using the supplied configuration.
|
||||
*
|
||||
@ -70,6 +73,7 @@ private static interface ResultSetExtractor<T> {
|
||||
*/
|
||||
public SqlServerConnection(Configuration config) {
|
||||
super(config, FACTORY);
|
||||
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
|
||||
}
|
||||
|
||||
|
||||
@ -164,12 +168,21 @@ public Instant timestampOfLsn(Lsn lsn) throws SQLException {
|
||||
if (lsn.getBinary() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Instant cachedInstant = lsnToInstantCache.get(lsn);
|
||||
if (cachedInstant != null) {
|
||||
return cachedInstant;
|
||||
}
|
||||
|
||||
return prepareQueryAndMap(query, statement -> {
|
||||
statement.setBytes(1, lsn.getBinary());
|
||||
}, singleResultMapper(rs -> {
|
||||
final Timestamp ts = rs.getTimestamp(1);
|
||||
final Instant ret = (ts == null) ? null : ts.toInstant();
|
||||
LOGGER.trace("Timestamp of lsn {} is {}", lsn, ret);
|
||||
if (ret != null) {
|
||||
lsnToInstantCache.put(lsn, ret);
|
||||
}
|
||||
return ret;
|
||||
}, "LSN to timestamp query must return exactly one value"));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user