DBZ-1491 Add SqlServerDefaultValueConverter class
This commit is contained in:
parent
234341a471
commit
825ffe3848
@ -6,7 +6,6 @@
|
||||
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
@ -16,18 +15,12 @@
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -40,11 +33,8 @@
|
||||
import io.debezium.relational.ColumnEditor;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.util.BoundedConcurrentHashMap;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.HexConverter;
|
||||
import microsoft.sql.DateTimeOffset;
|
||||
|
||||
/**
|
||||
* {@link JdbcConnection} extension to be used with Microsoft SQL Server
|
||||
@ -87,30 +77,13 @@ public class SqlServerConnection extends JdbcConnection {
|
||||
private final ZoneId transactionTimezone;
|
||||
private final SourceTimestampMode sourceTimestampMode;
|
||||
private final Clock clock;
|
||||
private final SqlServerValueConverters valueConverters;
|
||||
|
||||
public static interface ResultSetExtractor<T> {
|
||||
T apply(ResultSet rs) throws SQLException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts JDBC string representation of a default column value to an object.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface DefaultValueMapper {
|
||||
|
||||
/**
|
||||
* Parses string to an object.
|
||||
* @param value string representation
|
||||
* @return value
|
||||
* @throws Exception if there is an parsing error
|
||||
*/
|
||||
Object parse(String value) throws Exception;
|
||||
}
|
||||
|
||||
private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;
|
||||
|
||||
private final Map<String, DefaultValueMapper> defaultValueMappers;
|
||||
private final SqlServerDefaultValueConverter defaultValueConverter;
|
||||
|
||||
/**
|
||||
* Creates a new connection using the supplied configuration.
|
||||
@ -124,7 +97,6 @@ public interface DefaultValueMapper {
|
||||
*/
|
||||
public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode, SqlServerValueConverters valueConverters) {
|
||||
super(config, FACTORY);
|
||||
this.valueConverters = valueConverters;
|
||||
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
|
||||
realDatabaseName = retrieveRealDatabaseName();
|
||||
boolean supportsAtTimeZone = supportsAtTimeZone();
|
||||
@ -132,7 +104,7 @@ public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMod
|
||||
lsnToTimestamp = getLsnToTimestamp(supportsAtTimeZone);
|
||||
this.clock = clock;
|
||||
this.sourceTimestampMode = sourceTimestampMode;
|
||||
defaultValueMappers = createDefaultValueMappers();
|
||||
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -153,75 +125,6 @@ private static String getLsnToTimestamp(boolean supportsAtTimeZone) {
|
||||
return lsnToTimestamp;
|
||||
}
|
||||
|
||||
private Map<String, DefaultValueMapper> createDefaultValueMappers() {
|
||||
Map<String, DefaultValueMapper> result = new HashMap<>();
|
||||
|
||||
// Exact numbers
|
||||
result.put("bigint", v -> Long.parseLong(v.substring(2, v.length() - 3))); // Sample value: ((3147483648.))
|
||||
result.put("int", v -> Integer.parseInt(v.substring(2, v.length() - 2))); // Sample value: ((2147483647))
|
||||
result.put("smallint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((32767))
|
||||
result.put("tinyint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((255))
|
||||
result.put("bit", v -> v.equals("((1))")); // Either ((1)) or ((0))
|
||||
result.put("decimal", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345))
|
||||
result.put("numeric", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345))
|
||||
result.put("money", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((922337203685477.58))
|
||||
result.put("smallmoney", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((214748.3647))
|
||||
|
||||
// Approximate numerics
|
||||
result.put("float", v -> Double.parseDouble(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003))
|
||||
result.put("real", v -> Float.parseFloat(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003))
|
||||
|
||||
// Date and time
|
||||
result.put("date", v -> { // Sample value: ('2019-02-03')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return prepareQueryAndMap("SELECT PARSE(? AS date)", st -> st.setString(1, rawValue),
|
||||
singleResultMapper(rs -> rs.getDate(1), "Parse() should return exactly one result."));
|
||||
});
|
||||
result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return prepareQueryAndMap("SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue),
|
||||
singleResultMapper(rs -> rs.getTimestamp(1), "Parse() should return exactly one result."));
|
||||
});
|
||||
result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return prepareQueryAndMap("SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue),
|
||||
singleResultMapper(rs -> rs.getTimestamp(1), "Parse() should return exactly one result."));
|
||||
});
|
||||
result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return prepareQueryAndMap("SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue),
|
||||
singleResultMapper(rs -> (DateTimeOffset) rs.getObject(1), "Parse() should return exactly one result."));
|
||||
});
|
||||
result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return prepareQueryAndMap("SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue),
|
||||
singleResultMapper(rs -> rs.getTimestamp(1), "Parse() should return exactly one result."));
|
||||
});
|
||||
result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return prepareQueryAndMap("SELECT PARSE(? AS time)", st -> st.setString(1, rawValue),
|
||||
singleResultMapper(rs -> rs.getTime(1), "Parse() should return exactly one result."));
|
||||
});
|
||||
|
||||
// Character strings
|
||||
result.put("char", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("text", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("varchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
|
||||
// Unicode character strings
|
||||
result.put("nchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("ntext", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("nvarchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
|
||||
// Binary strings
|
||||
result.put("binary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405)
|
||||
result.put("image", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405)
|
||||
result.put("varbinary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405)
|
||||
|
||||
// Other data types, such as cursor, xml or uniqueidentifier, have been omitted.
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current largest log sequence number
|
||||
*/
|
||||
@ -480,52 +383,6 @@ public Table getTableSchemaFromTable(SqlServerChangeTable changeTable) throws SQ
|
||||
.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setDefaultValue(ColumnEditor columnEditor, String defaultValue) {
|
||||
if (defaultValue == null) {
|
||||
return;
|
||||
}
|
||||
parseDefaultValue(columnEditor.typeName(), defaultValue)
|
||||
.map(rawDefaultValue -> convertDefaultValue(rawDefaultValue, columnEditor))
|
||||
.ifPresent(columnEditor::defaultValue);
|
||||
}
|
||||
|
||||
private Optional<Object> parseDefaultValue(String dataType, String defaultValue) {
|
||||
DefaultValueMapper mapper = defaultValueMappers.get(dataType);
|
||||
if (mapper == null) {
|
||||
LOGGER.warn("Mapper for type '{}' not found.", dataType);
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(mapper.parse(defaultValue));
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.warn("Cannot parse column default value '{}' to type '{}'.", defaultValue, dataType, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Object convertDefaultValue(Object defaultValue, ColumnEditor columnEditor) {
|
||||
final Column column = columnEditor.create();
|
||||
|
||||
// if converters is not null and the default value is not null, we need to convert default value
|
||||
if (valueConverters != null && defaultValue != null) {
|
||||
final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column);
|
||||
if (schemaBuilder == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
final Schema schema = schemaBuilder.build();
|
||||
// In order to get the valueConverter for this column, we have to create a field;
|
||||
// The index value -1 in the field will never used when converting default value;
|
||||
// So we can set any number here;
|
||||
final Field field = new Field(columnEditor.name(), -1, schema);
|
||||
final ValueConverter valueConverter = valueConverters.converter(columnEditor.create(), field);
|
||||
return valueConverter.convert(defaultValue);
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
public Table getTableSchemaFromChangeTable(SqlServerChangeTable changeTable) throws SQLException {
|
||||
final DatabaseMetaData metadata = connection().getMetaData();
|
||||
final TableId changeTableId = changeTable.getChangeTableId();
|
||||
@ -633,4 +490,13 @@ private Optional<Integer> getSqlServerVersion() {
|
||||
throw new RuntimeException("Couldn't obtain database server version", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setDefaultValue(ColumnEditor columnEditor, String defaultValue) {
|
||||
if (defaultValue != null) {
|
||||
defaultValueConverter
|
||||
.parseDefaultValue(columnEditor, defaultValue)
|
||||
.ifPresent(columnEditor::defaultValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,8 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class);
|
||||
|
||||
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, ValueConverterProvider valueConverter, TopicSelector<TableId> topicSelector, SchemaNameAdjuster schemaNameAdjuster) {
|
||||
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, ValueConverterProvider valueConverter, TopicSelector<TableId> topicSelector,
|
||||
SchemaNameAdjuster schemaNameAdjuster) {
|
||||
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
|
||||
new TableSchemaBuilder(
|
||||
valueConverter,
|
||||
|
@ -0,0 +1,190 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.sqlserver.SqlServerConnection.ResultSetExtractor;
|
||||
import io.debezium.jdbc.JdbcConnection.StatementPreparer;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ColumnEditor;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.util.HexConverter;
|
||||
|
||||
import microsoft.sql.DateTimeOffset;
|
||||
|
||||
/**
|
||||
* Parses and converts column default values.
|
||||
*/
|
||||
class SqlServerDefaultValueConverter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerDefaultValueConverter.class);
|
||||
|
||||
/**
|
||||
* Provides SQL connection instance.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
interface ConnectionProvider {
|
||||
Connection get() throws SQLException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts JDBC string representation of a default column value to an object.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
private interface DefaultValueMapper {
|
||||
|
||||
/**
|
||||
* Parses string to an object.
|
||||
*
|
||||
* @param value string representation
|
||||
* @return value
|
||||
* @throws Exception if there is an parsing error
|
||||
*/
|
||||
Object parse(String value) throws Exception;
|
||||
}
|
||||
|
||||
private final ConnectionProvider connectionProvider;
|
||||
private final SqlServerValueConverters valueConverters;
|
||||
private final Map<String, DefaultValueMapper> defaultValueMappers;
|
||||
|
||||
SqlServerDefaultValueConverter(ConnectionProvider connectionProvider, SqlServerValueConverters valueConverters) {
|
||||
this.connectionProvider = connectionProvider;
|
||||
this.valueConverters = valueConverters;
|
||||
this.defaultValueMappers = createDefaultValueMappers();
|
||||
}
|
||||
|
||||
Optional<Object> parseDefaultValue(ColumnEditor columnEditor, String defaultValue) {
|
||||
final String dataType = columnEditor.typeName();
|
||||
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
|
||||
if (mapper == null) {
|
||||
LOGGER.warn("Mapper for type '{}' not found.", dataType);
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
Object rawDefaultValue = mapper.parse(defaultValue);
|
||||
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, columnEditor);
|
||||
return Optional.of(convertedDefaultValue);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.warn("Cannot parse column default value '{}' to type '{}'.", defaultValue, dataType, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Object convertDefaultValue(Object defaultValue, ColumnEditor columnEditor) {
|
||||
final Column column = columnEditor.create();
|
||||
|
||||
// if converters is not null and the default value is not null, we need to convert default value
|
||||
if (valueConverters != null && defaultValue != null) {
|
||||
final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column);
|
||||
if (schemaBuilder == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
final Schema schema = schemaBuilder.build();
|
||||
// In order to get the valueConverter for this column, we have to create a field;
|
||||
// The index value -1 in the field will never used when converting default value;
|
||||
// So we can set any number here;
|
||||
final Field field = new Field(columnEditor.name(), -1, schema);
|
||||
final ValueConverter valueConverter = valueConverters.converter(columnEditor.create(), field);
|
||||
return valueConverter.convert(defaultValue);
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
private Map<String, DefaultValueMapper> createDefaultValueMappers() {
|
||||
final Map<String, DefaultValueMapper> result = new HashMap<>();
|
||||
|
||||
// Exact numbers
|
||||
result.put("bigint", v -> Long.parseLong(v.substring(2, v.length() - 3))); // Sample value: ((3147483648.))
|
||||
result.put("int", v -> Integer.parseInt(v.substring(2, v.length() - 2))); // Sample value: ((2147483647))
|
||||
result.put("smallint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((32767))
|
||||
result.put("tinyint", v -> Short.parseShort(v.substring(2, v.length() - 2))); // Sample value: ((255))
|
||||
result.put("bit", v -> v.equals("((1))")); // Either ((1)) or ((0))
|
||||
result.put("decimal", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345))
|
||||
result.put("numeric", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((100.12345))
|
||||
result.put("money", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((922337203685477.58))
|
||||
result.put("smallmoney", v -> new BigDecimal(v.substring(2, v.length() - 2))); // Sample value: ((214748.3647))
|
||||
|
||||
// Approximate numerics
|
||||
result.put("float", v -> Double.parseDouble(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003))
|
||||
result.put("real", v -> Float.parseFloat(v.substring(2, v.length() - 2))); // Sample value: ((1.2345000000000000e+003))
|
||||
|
||||
// Date and time
|
||||
result.put("date", v -> { // Sample value: ('2019-02-03')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return querySingleValue("SELECT PARSE(? AS date)", st -> st.setString(1, rawValue), rs -> rs.getDate(1));
|
||||
});
|
||||
result.put("datetime", v -> { // Sample value: ('2019-01-01 00:00:00.000')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return querySingleValue("SELECT PARSE(? AS datetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
||||
});
|
||||
result.put("datetime2", v -> { // Sample value: ('2019-01-01 00:00:00.1234567')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return querySingleValue("SELECT PARSE(? AS datetime2)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
||||
});
|
||||
result.put("datetimeoffset", v -> { // Sample value: ('2019-01-01 00:00:00.1234567+02:00')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return querySingleValue("SELECT PARSE(? AS datetimeoffset)", st -> st.setString(1, rawValue), rs -> (DateTimeOffset) rs.getObject(1));
|
||||
});
|
||||
result.put("smalldatetime", v -> { // Sample value: ('2019-01-01 00:00:00')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return querySingleValue("SELECT PARSE(? AS smalldatetime)", st -> st.setString(1, rawValue), rs -> rs.getTimestamp(1));
|
||||
});
|
||||
result.put("time", v -> { // Sample value: ('2019-01-01 00:00:00')
|
||||
String rawValue = v.substring(2, v.length() - 2);
|
||||
return querySingleValue("SELECT PARSE(? AS time)", st -> st.setString(1, rawValue), rs -> rs.getTime(1));
|
||||
});
|
||||
|
||||
// Character strings
|
||||
result.put("char", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("text", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("varchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
|
||||
// Unicode character strings
|
||||
result.put("nchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("ntext", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
result.put("nvarchar", v -> v.substring(2, v.length() - 2)); // Sample value: ('aaa')
|
||||
|
||||
// Binary strings
|
||||
result.put("binary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405)
|
||||
result.put("image", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405)
|
||||
result.put("varbinary", v -> HexConverter.convertFromHex(v.substring(3, v.length() - 1))); // Sample value: (0x0102030405)
|
||||
|
||||
// Other data types, such as cursor, xml or uniqueidentifier, have been omitted.
|
||||
return result;
|
||||
}
|
||||
|
||||
private <T> T querySingleValue(String queryString, StatementPreparer preparer, ResultSetExtractor<T> extractor) throws SQLException {
|
||||
PreparedStatement preparedStatement = connectionProvider.get().prepareStatement(queryString);
|
||||
preparer.accept(preparedStatement);
|
||||
try (ResultSet resultSet = preparedStatement.executeQuery()) {
|
||||
if (resultSet.next()) {
|
||||
final T result = extractor.apply(resultSet);
|
||||
if (!resultSet.next()) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("Exactly one result expected.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -754,7 +754,7 @@ public void changeColumn() throws Exception {
|
||||
@Test
|
||||
public void addDefaultValue() throws Exception {
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
@ -780,7 +780,7 @@ public void alterDefaultValue() throws Exception {
|
||||
TestHelper.enableTableCdc(connection, "table_dv");
|
||||
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
|
@ -19,6 +19,7 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -105,6 +106,7 @@ public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1015")
|
||||
public void shouldProperlyGetDefaultColumnValues() throws Exception {
|
||||
try (SqlServerConnection connection = TestHelper.adminConnection()) {
|
||||
connection.connect();
|
||||
@ -176,7 +178,7 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
|
||||
// and issue a test call to a CDC wrapper function
|
||||
Thread.sleep(5_000); // Need to wait to make sure the min_lsn is available
|
||||
|
||||
ChangeTable changeTable = new ChangeTable(new TableId("testDB", "dbo", "table_with_defaults"),
|
||||
SqlServerChangeTable changeTable = new SqlServerChangeTable(new TableId("testDB", "dbo", "table_with_defaults"),
|
||||
null, 0, null, null);
|
||||
Table table = connection.getTableSchemaFromTable(changeTable);
|
||||
|
||||
|
@ -1431,7 +1431,8 @@ public void shouldCaptureTableSchema() throws SQLException, InterruptedException
|
||||
assertConnectorIsRunning();
|
||||
TestHelper.waitForSnapshotToBeCompleted();
|
||||
|
||||
connection.execute("INSERT INTO table_schema_test (key_cola, key_colb, cola, colb, colc, cold) VALUES(1, 'a', 100, '2019-01-01 10:20:39.1234567 +02:00', 'some_value', 100.20)");
|
||||
connection.execute(
|
||||
"INSERT INTO table_schema_test (key_cola, key_colb, cola, colb, colc, cold) VALUES(1, 'a', 100, '2019-01-01 10:20:39.1234567 +02:00', 'some_value', 100.20)");
|
||||
|
||||
List<SourceRecord> records = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_schema_test");
|
||||
assertThat(records).hasSize(1);
|
||||
@ -1440,19 +1441,19 @@ public void shouldCaptureTableSchema() throws SQLException, InterruptedException
|
||||
.name("server1.dbo.table_schema_test.Key")
|
||||
.field("key_cola", Schema.INT32_SCHEMA)
|
||||
.field("key_colb", Schema.STRING_SCHEMA)
|
||||
.build()
|
||||
)
|
||||
.build())
|
||||
.valueAfterFieldSchemaIsEqualTo(SchemaBuilder.struct()
|
||||
.optional()
|
||||
.name("server1.dbo.table_schema_test.Value")
|
||||
.field("key_cola", Schema.INT32_SCHEMA)
|
||||
.field("key_colb", Schema.STRING_SCHEMA)
|
||||
.field("cola", Schema.INT32_SCHEMA)
|
||||
.field("colb", SchemaBuilder.string().name("io.debezium.time.ZonedTimestamp").required().defaultValue("2019-01-01T12:34:56.1234567+04:00").version(1).build())
|
||||
.field("colb",
|
||||
SchemaBuilder.string().name("io.debezium.time.ZonedTimestamp").required().defaultValue("2019-01-01T12:34:56.1234567+04:00").version(1)
|
||||
.build())
|
||||
.field("colc", SchemaBuilder.string().optional().defaultValue("default_value").build())
|
||||
.field("cold", Schema.OPTIONAL_FLOAT64_SCHEMA)
|
||||
.build()
|
||||
);
|
||||
.build());
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
|
@ -208,11 +208,13 @@ private static void dropTestDatabase(SqlServerConnection connection) throws SQLE
|
||||
}
|
||||
|
||||
public static SqlServerConnection adminConnection() {
|
||||
return new SqlServerConnection(TestHelper.adminJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(), new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE));
|
||||
return new SqlServerConnection(TestHelper.adminJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(),
|
||||
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE));
|
||||
}
|
||||
|
||||
public static SqlServerConnection testConnection() {
|
||||
return new SqlServerConnection(TestHelper.defaultJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(), new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE));
|
||||
return new SqlServerConnection(TestHelper.defaultJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode(),
|
||||
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1112,12 +1112,8 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
|
||||
column.generated("YES".equalsIgnoreCase(autogenerated));
|
||||
|
||||
column.nativeType(resolveNativeType(column.typeName()));
|
||||
<<<<<<< HEAD
|
||||
column.jdbcType(resolveJdbcType(columnMetadata.getInt(5), column.nativeType()));
|
||||
parseDefaultValue(columnType, columnMetadata.getString(13)).ifPresent(column::defaultValue);
|
||||
=======
|
||||
setDefaultValue(column, columnMetadata.getString(13));
|
||||
>>>>>>> DBZ-1491 Parse temporal values using queries on database
|
||||
return Optional.of(column);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user