diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 8f084125a..5c7d2bd06 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -6,7 +6,6 @@ package io.debezium.connector.sqlserver; -import java.math.BigDecimal; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; @@ -16,18 +15,12 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +33,8 @@ import io.debezium.relational.ColumnEditor; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.ValueConverter; import io.debezium.util.BoundedConcurrentHashMap; import io.debezium.util.Clock; -import io.debezium.util.HexConverter; -import microsoft.sql.DateTimeOffset; /** * {@link JdbcConnection} extension to be used with Microsoft SQL Server @@ -87,30 +77,13 @@ public class SqlServerConnection extends JdbcConnection { private final ZoneId transactionTimezone; private final SourceTimestampMode sourceTimestampMode; private final Clock clock; - private final SqlServerValueConverters valueConverters; public static interface ResultSetExtractor { T apply(ResultSet rs) throws SQLException; } - /** - * Converts JDBC string representation of a default column value to an object. - */ - @FunctionalInterface - public interface DefaultValueMapper { - - /** - * Parses string to an object. - * @param value string representation - * @return value - * @throws Exception if there is an parsing error - */ - Object parse(String value) throws Exception; - } - private final BoundedConcurrentHashMap lsnToInstantCache; - - private final Map defaultValueMappers; + private final SqlServerDefaultValueConverter defaultValueConverter; /** * Creates a new connection using the supplied configuration. @@ -124,7 +97,6 @@ public interface DefaultValueMapper { */ public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode, SqlServerValueConverters valueConverters) { super(config, FACTORY); - this.valueConverters = valueConverters; lsnToInstantCache = new BoundedConcurrentHashMap<>(100); realDatabaseName = retrieveRealDatabaseName(); boolean supportsAtTimeZone = supportsAtTimeZone(); @@ -132,7 +104,7 @@ public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMod lsnToTimestamp = getLsnToTimestamp(supportsAtTimeZone); this.clock = clock; this.sourceTimestampMode = sourceTimestampMode; - defaultValueMappers = createDefaultValueMappers(); + defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters); } /** @@ -153,75 +125,6 @@ private static String getLsnToTimestamp(boolean supportsAtTimeZone) { return lsnToTimestamp; } - private Map createDefaultValueMappers() { - Map result = new HashMap<>(); - - // Exact numbers - result.put("bigint", v -> Long.parseLong(v.substring(2, v.length() - 3))); // Sample value: ((3147483648.)) - result.put("int", v -> Integer.parseInt(v.substring(2, v.length() - 2))); // Sample value: ((2147483647)) - result.put("smallint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((32767)) - result.put("tinyint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((255)) - result.put("bit", v -> v.equals("((1))")); // Either ((1)) or ((0)) - result.put("decimal", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345)) - result.put("numeric", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345)) - result.put("money", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((922337203685477.58)) - result.put("smallmoney", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((214748.3647)) - - // Approximate numerics - result.put("float", v -> Double.parseDouble(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003)) - result.put("real", v -> Float.parseFloat(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003)) - - // Date and time - result.put("date", v -> { // Sample value: ('2019-02-03') - String rawValue = v.substring(2, v.length() - 2); - return prepareQueryAndMap("SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), - singleResultMapper(rs -> rs.getDate(1), "Parse() should return exactly one result.")); - }); - result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000') - String rawValue = v.substring(2, v.length() - 2); - return prepareQueryAndMap("SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), - singleResultMapper(rs -> rs.getTimestamp(1), "Parse() should return exactly one result.")); - }); - result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567') - String rawValue = v.substring(2, v.length() - 2); - return prepareQueryAndMap("SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), - singleResultMapper(rs -> rs.getTimestamp(1), "Parse() should return exactly one result.")); - }); - result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00') - String rawValue = v.substring(2, v.length() - 2); - return prepareQueryAndMap("SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue), - singleResultMapper(rs -> (DateTimeOffset) rs.getObject(1), "Parse() should return exactly one result.")); - }); - result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00') - String rawValue = v.substring(2, v.length() - 2); - return prepareQueryAndMap("SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue), - singleResultMapper(rs -> rs.getTimestamp(1), "Parse() should return exactly one result.")); - }); - result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00') - String rawValue = v.substring(2, v.length() - 2); - return prepareQueryAndMap("SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), - singleResultMapper(rs -> rs.getTime(1), "Parse() should return exactly one result.")); - }); - - // Character strings - result.put("char", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') - result.put("text", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') - result.put("varchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') - - // Unicode character strings - result.put("nchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') - result.put("ntext", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') - result.put("nvarchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') - - // Binary strings - result.put("binary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405) - result.put("image", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405) - result.put("varbinary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405) - - // Other data types, such as cursor, xml or uniqueidentifier, have been omitted. - return result; - } - /** * @return the current largest log sequence number */ @@ -480,52 +383,6 @@ public Table getTableSchemaFromTable(SqlServerChangeTable changeTable) throws SQ .create(); } - @Override - protected void setDefaultValue(ColumnEditor columnEditor, String defaultValue) { - if (defaultValue == null) { - return; - } - parseDefaultValue(columnEditor.typeName(), defaultValue) - .map(rawDefaultValue -> convertDefaultValue(rawDefaultValue, columnEditor)) - .ifPresent(columnEditor::defaultValue); - } - - private Optional parseDefaultValue(String dataType, String defaultValue) { - DefaultValueMapper mapper = defaultValueMappers.get(dataType); - if (mapper == null) { - LOGGER.warn("Mapper for type '{}' not found.", dataType); - return Optional.empty(); - } - - try { - return Optional.of(mapper.parse(defaultValue)); - } - catch (Exception e) { - LOGGER.warn("Cannot parse column default value '{}' to type '{}'.", defaultValue, dataType, e); - return Optional.empty(); - } - } - - private Object convertDefaultValue(Object defaultValue, ColumnEditor columnEditor) { - final Column column = columnEditor.create(); - - // if converters is not null and the default value is not null, we need to convert default value - if (valueConverters != null && defaultValue != null) { - final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column); - if (schemaBuilder == null) { - return defaultValue; - } - final Schema schema = schemaBuilder.build(); - // In order to get the valueConverter for this column, we have to create a field; - // The index value -1 in the field will never used when converting default value; - // So we can set any number here; - final Field field = new Field(columnEditor.name(), -1, schema); - final ValueConverter valueConverter = valueConverters.converter(columnEditor.create(), field); - return valueConverter.convert(defaultValue); - } - return defaultValue; - } - public Table getTableSchemaFromChangeTable(SqlServerChangeTable changeTable) throws SQLException { final DatabaseMetaData metadata = connection().getMetaData(); final TableId changeTableId = changeTable.getChangeTableId(); @@ -633,4 +490,13 @@ private Optional getSqlServerVersion() { throw new RuntimeException("Couldn't obtain database server version", e); } } + + @Override + protected void setDefaultValue(ColumnEditor columnEditor, String defaultValue) { + if (defaultValue != null) { + defaultValueConverter + .parseDefaultValue(columnEditor, defaultValue) + .ifPresent(columnEditor::defaultValue); + } + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java index be0dc5739..9dc8307a8 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java @@ -29,7 +29,8 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class); - public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, ValueConverterProvider valueConverter, TopicSelector topicSelector, SchemaNameAdjuster schemaNameAdjuster) { + public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, ValueConverterProvider valueConverter, TopicSelector topicSelector, + SchemaNameAdjuster schemaNameAdjuster) { super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(), new TableSchemaBuilder( valueConverter, diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java new file mode 100644 index 000000000..883066d31 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDefaultValueConverter.java @@ -0,0 +1,190 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.sqlserver; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.sqlserver.SqlServerConnection.ResultSetExtractor; +import io.debezium.jdbc.JdbcConnection.StatementPreparer; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.ValueConverter; +import io.debezium.util.HexConverter; + +import microsoft.sql.DateTimeOffset; + +/** + * Parses and converts column default values. + */ +class SqlServerDefaultValueConverter { + + private static Logger LOGGER = LoggerFactory.getLogger(SqlServerDefaultValueConverter.class); + + /** + * Provides SQL connection instance. + */ + @FunctionalInterface + interface ConnectionProvider { + Connection get() throws SQLException; + } + + /** + * Converts JDBC string representation of a default column value to an object. + */ + @FunctionalInterface + private interface DefaultValueMapper { + + /** + * Parses string to an object. + * + * @param value string representation + * @return value + * @throws Exception if there is an parsing error + */ + Object parse(String value) throws Exception; + } + + private final ConnectionProvider connectionProvider; + private final SqlServerValueConverters valueConverters; + private final Map defaultValueMappers; + + SqlServerDefaultValueConverter(ConnectionProvider connectionProvider, SqlServerValueConverters valueConverters) { + this.connectionProvider = connectionProvider; + this.valueConverters = valueConverters; + this.defaultValueMappers = createDefaultValueMappers(); + } + + Optional parseDefaultValue(ColumnEditor columnEditor, String defaultValue) { + final String dataType = columnEditor.typeName(); + final DefaultValueMapper mapper = defaultValueMappers.get(dataType); + if (mapper == null) { + LOGGER.warn("Mapper for type '{}' not found.", dataType); + return Optional.empty(); + } + + try { + Object rawDefaultValue = mapper.parse(defaultValue); + Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, columnEditor); + return Optional.of(convertedDefaultValue); + } + catch (Exception e) { + LOGGER.warn("Cannot parse column default value '{}' to type '{}'.", defaultValue, dataType, e); + return Optional.empty(); + } + } + + private Object convertDefaultValue(Object defaultValue, ColumnEditor columnEditor) { + final Column column = columnEditor.create(); + + // if converters is not null and the default value is not null, we need to convert default value + if (valueConverters != null && defaultValue != null) { + final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column); + if (schemaBuilder == null) { + return defaultValue; + } + final Schema schema = schemaBuilder.build(); + // In order to get the valueConverter for this column, we have to create a field; + // The index value -1 in the field will never used when converting default value; + // So we can set any number here; + final Field field = new Field(columnEditor.name(), -1, schema); + final ValueConverter valueConverter = valueConverters.converter(columnEditor.create(), field); + return valueConverter.convert(defaultValue); + } + return defaultValue; + } + + private Map createDefaultValueMappers() { + final Map result = new HashMap<>(); + + // Exact numbers + result.put("bigint", v -> Long.parseLong(v.substring(2, v.length() - 3))); // Sample value: ((3147483648.)) + result.put("int", v -> Integer.parseInt(v.substring(2, v.length() - 2))); // Sample value: ((2147483647)) + result.put("smallint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((32767)) + result.put("tinyint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((255)) + result.put("bit", v -> v.equals("((1))")); // Either ((1)) or ((0)) + result.put("decimal", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345)) + result.put("numeric", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345)) + result.put("money", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((922337203685477.58)) + result.put("smallmoney", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((214748.3647)) + + // Approximate numerics + result.put("float", v -> Double.parseDouble(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003)) + result.put("real", v -> Float.parseFloat(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003)) + + // Date and time + result.put("date", v -> { // Sample value: ('2019-02-03') + String rawValue = v.substring(2, v.length() - 2); + return querySingleValue("SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), rs -> rs.getDate(1)); + }); + result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000') + String rawValue = v.substring(2, v.length() - 2); + return querySingleValue("SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); + }); + result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567') + String rawValue = v.substring(2, v.length() - 2); + return querySingleValue("SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); + }); + result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00') + String rawValue = v.substring(2, v.length() - 2); + return querySingleValue("SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue), rs -> (DateTimeOffset) rs.getObject(1)); + }); + result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00') + String rawValue = v.substring(2, v.length() - 2); + return querySingleValue("SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1)); + }); + result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00') + String rawValue = v.substring(2, v.length() - 2); + return querySingleValue("SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), rs -> rs.getTime(1)); + }); + + // Character strings + result.put("char", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') + result.put("text", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') + result.put("varchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') + + // Unicode character strings + result.put("nchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') + result.put("ntext", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') + result.put("nvarchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa') + + // Binary strings + result.put("binary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405) + result.put("image", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405) + result.put("varbinary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405) + + // Other data types, such as cursor, xml or uniqueidentifier, have been omitted. + return result; + } + + private T querySingleValue(String queryString, StatementPreparer preparer, ResultSetExtractor extractor) throws SQLException { + PreparedStatement preparedStatement = connectionProvider.get().prepareStatement(queryString); + preparer.accept(preparedStatement); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (resultSet.next()) { + final T result = extractor.apply(resultSet); + if (!resultSet.next()) { + return result; + } + } + throw new IllegalStateException("Exactly one result expected."); + } + } + +} diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java index 5f6d29358..8529d61fd 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java @@ -754,7 +754,7 @@ public void changeColumn() throws Exception { @Test public void addDefaultValue() throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) .build(); start(SqlServerConnector.class, config); @@ -780,7 +780,7 @@ public void alterDefaultValue() throws Exception { TestHelper.enableTableCdc(connection, "table_dv"); final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) .build(); start(SqlServerConnector.class, config); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index 2307ac3bd..0873d8cfe 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -19,6 +19,7 @@ import org.junit.Test; import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.doc.FixFor; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -105,6 +106,7 @@ public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception { } @Test + @FixFor("DBZ-1015") public void shouldProperlyGetDefaultColumnValues() throws Exception { try (SqlServerConnection connection = TestHelper.adminConnection()) { connection.connect(); @@ -176,7 +178,7 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception { // and issue a test call to a CDC wrapper function Thread.sleep(5_000); // Need to wait to make sure the min_lsn is available - ChangeTable changeTable = new ChangeTable(new TableId("testDB", "dbo", "table_with_defaults"), + SqlServerChangeTable changeTable = new SqlServerChangeTable(new TableId("testDB", "dbo", "table_with_defaults"), null, 0, null, null); Table table = connection.getTableSchemaFromTable(changeTable); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 65eeb2256..6c74ace49 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -1431,7 +1431,8 @@ public void shouldCaptureTableSchema() throws SQLException, InterruptedException assertConnectorIsRunning(); TestHelper.waitForSnapshotToBeCompleted(); - connection.execute("INSERT INTO table_schema_test (key_cola, key_colb, cola, colb, colc, cold) VALUES(1, 'a', 100, '2019-01-01 10:20:39.1234567 +02:00', 'some_value', 100.20)"); + connection.execute( + "INSERT INTO table_schema_test (key_cola, key_colb, cola, colb, colc, cold) VALUES(1, 'a', 100, '2019-01-01 10:20:39.1234567 +02:00', 'some_value', 100.20)"); List records = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_schema_test"); assertThat(records).hasSize(1); @@ -1440,19 +1441,19 @@ public void shouldCaptureTableSchema() throws SQLException, InterruptedException .name("server1.dbo.table_schema_test.Key") .field("key_cola", Schema.INT32_SCHEMA) .field("key_colb", Schema.STRING_SCHEMA) - .build() - ) + .build()) .valueAfterFieldSchemaIsEqualTo(SchemaBuilder.struct() .optional() .name("server1.dbo.table_schema_test.Value") .field("key_cola", Schema.INT32_SCHEMA) .field("key_colb", Schema.STRING_SCHEMA) .field("cola", Schema.INT32_SCHEMA) - .field("colb", SchemaBuilder.string().name("io.debezium.time.ZonedTimestamp").required().defaultValue("2019-01-01T12:34:56.1234567+04:00").version(1).build()) + .field("colb", + SchemaBuilder.string().name("io.debezium.time.ZonedTimestamp").required().defaultValue("2019-01-01T12:34:56.1234567+04:00").version(1) + .build()) .field("colc", SchemaBuilder.string().optional().defaultValue("default_value").build()) .field("cold", Schema.OPTIONAL_FLOAT64_SCHEMA) - .build() - ); + .build()); stopConnector(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index 55cbe996b..6b5ef321d 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -208,11 +208,13 @@ private static void dropTestDatabase(SqlServerConnection connection) throws SQLE } public static SqlServerConnection adminConnection() { - return new SqlServerConnection(TestHelper.adminJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(), new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE)); + return new SqlServerConnection(TestHelper.adminJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(), + new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE)); } public static SqlServerConnection testConnection() { - return new SqlServerConnection(TestHelper.defaultJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(), new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE)); + return new SqlServerConnection(TestHelper.defaultJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(), + new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE)); } /** diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 5287794e6..375559028 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -1112,12 +1112,8 @@ protected Optional readTableColumn(ResultSet columnMetadata, Table column.generated("YES".equalsIgnoreCase(autogenerated)); column.nativeType(resolveNativeType(column.typeName())); -<<<<<<< HEAD column.jdbcType(resolveJdbcType(columnMetadata.getInt(5), column.nativeType())); - parseDefaultValue(columnType, columnMetadata.getString(13)).ifPresent(column::defaultValue); -======= setDefaultValue(column, columnMetadata.getString(13)); ->>>>>>> DBZ-1491 Parse temporal values using queries on database return Optional.of(column); }