DBZ-1491 Refactor after review
This commit is contained in:
parent
825ffe3848
commit
45a6f9bef1
@ -78,10 +78,6 @@ public class SqlServerConnection extends JdbcConnection {
|
|||||||
private final SourceTimestampMode sourceTimestampMode;
|
private final SourceTimestampMode sourceTimestampMode;
|
||||||
private final Clock clock;
|
private final Clock clock;
|
||||||
|
|
||||||
public static interface ResultSetExtractor<T> {
|
|
||||||
T apply(ResultSet rs) throws SQLException;
|
|
||||||
}
|
|
||||||
|
|
||||||
private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;
|
private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;
|
||||||
private final SqlServerDefaultValueConverter defaultValueConverter;
|
private final SqlServerDefaultValueConverter defaultValueConverter;
|
||||||
|
|
||||||
@ -285,18 +281,6 @@ private String cdcNameForTable(TableId tableId) {
|
|||||||
return tableId.schema() + '_' + tableId.table();
|
return tableId.schema() + '_' + tableId.table();
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> ResultSetMapper<T> singleResultMapper(ResultSetExtractor<T> extractor, String error) throws SQLException {
|
|
||||||
return (rs) -> {
|
|
||||||
if (rs.next()) {
|
|
||||||
final T ret = extractor.apply(rs);
|
|
||||||
if (!rs.next()) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new IllegalStateException(error);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class CdcEnabledTable {
|
public static class CdcEnabledTable {
|
||||||
private final String tableId;
|
private final String tableId;
|
||||||
private final String captureName;
|
private final String captureName;
|
||||||
|
@ -8,9 +8,8 @@
|
|||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.PreparedStatement;
|
|
||||||
import java.sql.ResultSet;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -21,8 +20,8 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import io.debezium.connector.sqlserver.SqlServerConnection.ResultSetExtractor;
|
import io.debezium.annotation.ThreadSafe;
|
||||||
import io.debezium.jdbc.JdbcConnection.StatementPreparer;
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
import io.debezium.relational.Column;
|
import io.debezium.relational.Column;
|
||||||
import io.debezium.relational.ColumnEditor;
|
import io.debezium.relational.ColumnEditor;
|
||||||
import io.debezium.relational.ValueConverter;
|
import io.debezium.relational.ValueConverter;
|
||||||
@ -33,6 +32,7 @@
|
|||||||
/**
|
/**
|
||||||
* Parses and converts column default values.
|
* Parses and converts column default values.
|
||||||
*/
|
*/
|
||||||
|
@ThreadSafe
|
||||||
class SqlServerDefaultValueConverter {
|
class SqlServerDefaultValueConverter {
|
||||||
|
|
||||||
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerDefaultValueConverter.class);
|
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerDefaultValueConverter.class);
|
||||||
@ -68,7 +68,7 @@ private interface DefaultValueMapper {
|
|||||||
SqlServerDefaultValueConverter(ConnectionProvider connectionProvider, SqlServerValueConverters valueConverters) {
|
SqlServerDefaultValueConverter(ConnectionProvider connectionProvider, SqlServerValueConverters valueConverters) {
|
||||||
this.connectionProvider = connectionProvider;
|
this.connectionProvider = connectionProvider;
|
||||||
this.valueConverters = valueConverters;
|
this.valueConverters = valueConverters;
|
||||||
this.defaultValueMappers = createDefaultValueMappers();
|
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers());
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<Object> parseDefaultValue(ColumnEditor columnEditor, String defaultValue) {
|
Optional<Object> parseDefaultValue(ColumnEditor columnEditor, String defaultValue) {
|
||||||
@ -131,27 +131,29 @@ private Map<String, DefaultValueMapper> createDefaultValueMappers() {
|
|||||||
// Date and time
|
// Date and time
|
||||||
result.put("date", v -> { // Sample value: ('2019-02-03')
|
result.put("date", v -> { // Sample value: ('2019-02-03')
|
||||||
String rawValue = v.substring(2, v.length() - 2);
|
String rawValue = v.substring(2, v.length() - 2);
|
||||||
return querySingleValue("SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), rs -> rs.getDate(1));
|
return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), rs -> rs.getDate(1));
|
||||||
});
|
});
|
||||||
result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000')
|
result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000')
|
||||||
String rawValue = v.substring(2, v.length() - 2);
|
String rawValue = v.substring(2, v.length() - 2);
|
||||||
return querySingleValue("SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
||||||
});
|
});
|
||||||
result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567')
|
result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567')
|
||||||
String rawValue = v.substring(2, v.length() - 2);
|
String rawValue = v.substring(2, v.length() - 2);
|
||||||
return querySingleValue("SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
||||||
});
|
});
|
||||||
result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00')
|
result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00')
|
||||||
String rawValue = v.substring(2, v.length() - 2);
|
String rawValue = v.substring(2, v.length() - 2);
|
||||||
return querySingleValue("SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue), rs -> (DateTimeOffset) rs.getObject(1));
|
return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue),
|
||||||
|
rs -> (DateTimeOffset) rs.getObject(1));
|
||||||
});
|
});
|
||||||
result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00')
|
result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00')
|
||||||
String rawValue = v.substring(2, v.length() - 2);
|
String rawValue = v.substring(2, v.length() - 2);
|
||||||
return querySingleValue("SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue),
|
||||||
|
rs -> rs.getTimestamp(1));
|
||||||
});
|
});
|
||||||
result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00')
|
result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00')
|
||||||
String rawValue = v.substring(2, v.length() - 2);
|
String rawValue = v.substring(2, v.length() - 2);
|
||||||
return querySingleValue("SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), rs -> rs.getTime(1));
|
return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), rs -> rs.getTime(1));
|
||||||
});
|
});
|
||||||
|
|
||||||
// Character strings
|
// Character strings
|
||||||
@ -172,19 +174,4 @@ private Map<String, DefaultValueMapper> createDefaultValueMappers() {
|
|||||||
// Other data types, such as cursor, xml or uniqueidentifier, have been omitted.
|
// Other data types, such as cursor, xml or uniqueidentifier, have been omitted.
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T querySingleValue(String queryString, StatementPreparer preparer, ResultSetExtractor<T> extractor) throws SQLException {
|
|
||||||
PreparedStatement preparedStatement = connectionProvider.get().prepareStatement(queryString);
|
|
||||||
preparer.accept(preparedStatement);
|
|
||||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
|
||||||
if (resultSet.next()) {
|
|
||||||
final T result = extractor.apply(resultSet);
|
|
||||||
if (!resultSet.next()) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new IllegalStateException("Exactly one result expected.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
|
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
|
||||||
import io.debezium.connector.sqlserver.util.TestHelper;
|
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||||
|
import io.debezium.doc.FixFor;
|
||||||
import io.debezium.document.Array;
|
import io.debezium.document.Array;
|
||||||
import io.debezium.document.Document;
|
import io.debezium.document.Document;
|
||||||
import io.debezium.document.DocumentReader;
|
import io.debezium.document.DocumentReader;
|
||||||
@ -774,6 +775,7 @@ public void addDefaultValue() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@FixFor("DBZ-1491")
|
||||||
public void alterDefaultValue() throws Exception {
|
public void alterDefaultValue() throws Exception {
|
||||||
connection.execute("CREATE TABLE table_dv (id int primary key, colb varchar(30))");
|
connection.execute("CREATE TABLE table_dv (id int primary key, colb varchar(30))");
|
||||||
connection.execute("ALTER TABLE dbo.table_dv ADD CONSTRAINT DV_colb DEFAULT ('default_value') FOR colb");
|
connection.execute("ALTER TABLE dbo.table_dv ADD CONSTRAINT DV_colb DEFAULT ('default_value') FOR colb");
|
||||||
|
@ -38,6 +38,8 @@
|
|||||||
import io.debezium.annotation.ThreadSafe;
|
import io.debezium.annotation.ThreadSafe;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.config.Field;
|
import io.debezium.config.Field;
|
||||||
|
import io.debezium.jdbc.JdbcConnection.ResultSetExtractor;
|
||||||
|
import io.debezium.jdbc.JdbcConnection.ResultSetMapper;
|
||||||
import io.debezium.relational.Column;
|
import io.debezium.relational.Column;
|
||||||
import io.debezium.relational.ColumnEditor;
|
import io.debezium.relational.ColumnEditor;
|
||||||
import io.debezium.relational.TableId;
|
import io.debezium.relational.TableId;
|
||||||
@ -104,6 +106,14 @@ public static interface Operations {
|
|||||||
void apply(Statement statement) throws SQLException;
|
void apply(Statement statement) throws SQLException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts a data of resultset..
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
public static interface ResultSetExtractor<T> {
|
||||||
|
T apply(ResultSet rs) throws SQLException;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a {@link ConnectionFactory} that replaces variables in the supplied URL pattern. Variables include:
|
* Create a {@link ConnectionFactory} that replaces variables in the supplied URL pattern. Variables include:
|
||||||
* <ul>
|
* <ul>
|
||||||
@ -1213,4 +1223,29 @@ protected static boolean isNullable(int jdbcNullable) {
|
|||||||
return jdbcNullable == ResultSetMetaData.columnNullable || jdbcNullable == ResultSetMetaData.columnNullableUnknown;
|
return jdbcNullable == ResultSetMetaData.columnNullable || jdbcNullable == ResultSetMetaData.columnNullableUnknown;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T> ResultSetMapper<T> singleResultMapper(ResultSetExtractor<T> extractor, String error) throws SQLException {
|
||||||
|
return (rs) -> {
|
||||||
|
if (rs.next()) {
|
||||||
|
final T ret = extractor.apply(rs);
|
||||||
|
if (!rs.next()) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalStateException(error);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T querySingleValue(Connection connection, String queryString, StatementPreparer preparer, ResultSetExtractor<T> extractor) throws SQLException {
|
||||||
|
final PreparedStatement preparedStatement = connection.prepareStatement(queryString);
|
||||||
|
preparer.accept(preparedStatement);
|
||||||
|
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||||
|
if (resultSet.next()) {
|
||||||
|
final T result = extractor.apply(resultSet);
|
||||||
|
if (!resultSet.next()) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("Exactly one result expected.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user