diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 5c7d2bd06..7031e85ea 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -78,10 +78,6 @@ public class SqlServerConnection extends JdbcConnection { private final SourceTimestampMode sourceTimestampMode; private final Clock clock; - public static interface ResultSetExtractor { - T apply(ResultSet rs) throws SQLException; - } - private final BoundedConcurrentHashMap lsnToInstantCache; private final SqlServerDefaultValueConverter defaultValueConverter; @@ -285,18 +281,6 @@ private String cdcNameForTable(TableId tableId) { return tableId.schema() + '_' + tableId.table(); } - public ResultSetMapper singleResultMapper(ResultSetExtractor extractor, String error) throws SQLException { - return (rs) -> { - if (rs.next()) { - final T ret = extractor.apply(rs); - if (!rs.next()) { - return ret; - } - } - throw new IllegalStateException(error); - }; - } - public static class CdcEnabledTable { private final String tableId; private final String captureName; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java index 883066d31..657a8e87a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java @@ -8,9 +8,8 @@ import java.math.BigDecimal; import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -21,8 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.connector.sqlserver.SqlServerConnection.ResultSetExtractor; -import io.debezium.jdbc.JdbcConnection.StatementPreparer; +import io.debezium.annotation.ThreadSafe; +import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; import io.debezium.relational.ValueConverter; @@ -33,6 +32,7 @@ /** * Parses and converts column default values. */ +@ThreadSafe class SqlServerDefaultValueConverter { private static Logger LOGGER = LoggerFactory.getLogger(SqlServerDefaultValueConverter.class); @@ -68,7 +68,7 @@ private interface DefaultValueMapper { SqlServerDefaultValueConverter(ConnectionProvider connectionProvider, SqlServerValueConverters valueConverters) { this.connectionProvider = connectionProvider; this.valueConverters = valueConverters; - this.defaultValueMappers = createDefaultValueMappers(); + this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers()); } Optional parseDefaultValue(ColumnEditor columnEditor, String defaultValue) { @@ -131,27 +131,29 @@ private Map createDefaultValueMappers() { // Date and time result.put("date", v -> { // Sample value: ('2019-02-03') String rawValue = v.substring(2, v.length() - 2); - return querySingleValue("SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), rs -> rs.getDate(1)); + return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), rs -> rs.getDate(1)); }); result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000') String rawValue = v.substring(2, v.length() - 2); - return querySingleValue("SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); + return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); }); result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567') String rawValue = v.substring(2, v.length() - 2); - return querySingleValue("SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); + return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); }); result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00') String rawValue = v.substring(2, v.length() - 2); - return querySingleValue("SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue), rs -> (DateTimeOffset) rs.getObject(1)); + return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue), + rs -> (DateTimeOffset) rs.getObject(1)); }); result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00') String rawValue = v.substring(2, v.length() - 2); - return querySingleValue("SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); + return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue), + rs -> rs.getTimestamp(1)); }); result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00') String rawValue = v.substring(2, v.length() - 2); - return querySingleValue("SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), rs -> rs.getTime(1)); + return JdbcConnection.querySingleValue(connectionProvider.get(), "SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), rs -> rs.getTime(1)); }); // Character strings @@ -172,19 +174,4 @@ private Map createDefaultValueMappers() { // Other data types, such as cursor, xml or uniqueidentifier, have been omitted. return result; } - - private T querySingleValue(String queryString, StatementPreparer preparer, ResultSetExtractor extractor) throws SQLException { - PreparedStatement preparedStatement = connectionProvider.get().prepareStatement(queryString); - preparer.accept(preparedStatement); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (resultSet.next()) { - final T result = extractor.apply(resultSet); - if (!resultSet.next()) { - return result; - } - } - throw new IllegalStateException("Exactly one result expected."); - } - } - } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java index 8529d61fd..f6b2b6045 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java @@ -24,6 +24,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.doc.FixFor; import io.debezium.document.Array; import io.debezium.document.Document; import io.debezium.document.DocumentReader; @@ -774,6 +775,7 @@ public void addDefaultValue() throws Exception { } @Test + @FixFor("DBZ-1491") public void alterDefaultValue() throws Exception { connection.execute("CREATE TABLE table_dv (id int primary key, colb varchar(30))"); connection.execute("ALTER TABLE dbo.table_dv ADD CONSTRAINT DV_colb DEFAULT ('default_value') FOR colb"); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 375559028..aede5a9ab 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -38,6 +38,8 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.config.Configuration; import io.debezium.config.Field; +import io.debezium.jdbc.JdbcConnection.ResultSetExtractor; +import io.debezium.jdbc.JdbcConnection.ResultSetMapper; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; import io.debezium.relational.TableId; @@ -104,6 +106,14 @@ public static interface Operations { void apply(Statement statement) throws SQLException; } + /** + * Extracts a data of resultset.. + */ + @FunctionalInterface + public static interface ResultSetExtractor { + T apply(ResultSet rs) throws SQLException; + } + /** * Create a {@link ConnectionFactory} that replaces variables in the supplied URL pattern. Variables include: *
    @@ -1213,4 +1223,29 @@ protected static boolean isNullable(int jdbcNullable) { return jdbcNullable == ResultSetMetaData.columnNullable || jdbcNullable == ResultSetMetaData.columnNullableUnknown; } + public ResultSetMapper singleResultMapper(ResultSetExtractor extractor, String error) throws SQLException { + return (rs) -> { + if (rs.next()) { + final T ret = extractor.apply(rs); + if (!rs.next()) { + return ret; + } + } + throw new IllegalStateException(error); + }; + } + + public static T querySingleValue(Connection connection, String queryString, StatementPreparer preparer, ResultSetExtractor extractor) throws SQLException { + final PreparedStatement preparedStatement = connection.prepareStatement(queryString); + preparer.accept(preparedStatement); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (resultSet.next()) { + final T result = extractor.apply(resultSet); + if (!resultSet.next()) { + return result; + } + } + throw new IllegalStateException("Exactly one result expected."); + } + } }