DBZ-953 Ability to configure "decimal.handling.mode" for SQL Server connector
This commit is contained in:
parent
bb09d2c900
commit
6bbead4e1f
@ -17,6 +17,7 @@
|
||||
import io.debezium.document.Document;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.jdbc.JdbcConfiguration;
|
||||
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
|
||||
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -208,6 +209,14 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
|
||||
+ "which means that the connector does not hold any locks for all monitored tables."
|
||||
+ "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exlusive lock (and thus prevents any reads and updates) for all monitored tables.");
|
||||
|
||||
public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode")
|
||||
.withDisplayName("Decimal Handling").withEnum(DecimalHandlingMode.class, DecimalHandlingMode.PRECISE)
|
||||
.withWidth(Width.SHORT).withImportance(Importance.MEDIUM).withDescription(
|
||||
"Specify how DECIMAL and NUMERIC columns should be represented in change events, including:"
|
||||
+ "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; "
|
||||
+ "'string' uses string to represent values; "
|
||||
+ "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers.");
|
||||
|
||||
/**
|
||||
* The set of {@link Field}s defined as part of this configuration.
|
||||
*/
|
||||
@ -286,4 +295,100 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Decimal mode Enum for {@code decimal.handling.mode}
|
||||
* configuration This defaults to {@code precise} if nothing is provided.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public DecimalMode getDecimalModeConfig() {
|
||||
return DecimalHandlingMode
|
||||
.parse(this.getConfig().getString(DECIMAL_HANDLING_MODE))
|
||||
.asDecimalMode();
|
||||
}
|
||||
|
||||
/**
|
||||
* The set of predefined DecimalHandlingMode options or aliases.
|
||||
*/
|
||||
public enum DecimalHandlingMode implements EnumeratedValue {
|
||||
/**
|
||||
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise
|
||||
* {@link BigDecimal} values, which are represented in change events in
|
||||
* a binary form. This is precise but difficult to use.
|
||||
*/
|
||||
PRECISE("precise"),
|
||||
|
||||
/**
|
||||
* Represent {@code DECIMAL} and {@code NUMERIC} values as a string
|
||||
* values. This is precise, it supports also special values but the type
|
||||
* information is lost.
|
||||
*/
|
||||
STRING("string"),
|
||||
|
||||
/**
|
||||
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise
|
||||
* {@code double} values. This may be less precise but is far easier to
|
||||
* use.
|
||||
*/
|
||||
DOUBLE("double");
|
||||
|
||||
private final String value;
|
||||
|
||||
private DecimalHandlingMode(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public DecimalMode asDecimalMode() {
|
||||
switch (this) {
|
||||
case DOUBLE:
|
||||
return DecimalMode.DOUBLE;
|
||||
case STRING:
|
||||
return DecimalMode.STRING;
|
||||
case PRECISE:
|
||||
default:
|
||||
return DecimalMode.PRECISE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the supplied value is one of the predefined options.
|
||||
*
|
||||
* @param value
|
||||
* the configuration property value; may not be null
|
||||
* @return the matching option, or null if no match is found
|
||||
*/
|
||||
public static DecimalHandlingMode parse(String value) {
|
||||
if (value == null)
|
||||
return null;
|
||||
value = value.trim();
|
||||
for (DecimalHandlingMode option : DecimalHandlingMode.values()) {
|
||||
if (option.getValue().equalsIgnoreCase(value))
|
||||
return option;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the supplied value is one of the predefined options.
|
||||
*
|
||||
* @param value
|
||||
* the configuration property value; may not be null
|
||||
* @param defaultValue
|
||||
* the default value; may be null
|
||||
* @return the matching option, or null if no match is found and the
|
||||
* non-null default is invalid
|
||||
*/
|
||||
public static DecimalHandlingMode parse(String value, String defaultValue) {
|
||||
DecimalHandlingMode mode = parse(value);
|
||||
if (mode == null && defaultValue != null)
|
||||
mode = parse(defaultValue);
|
||||
return mode;
|
||||
}
|
||||
}
|
||||
}
|
@ -12,6 +12,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.DecimalHandlingMode;
|
||||
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -37,7 +38,9 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema
|
||||
|
||||
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, SqlServerConnection connection) {
|
||||
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
|
||||
new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA),
|
||||
new TableSchemaBuilder(new SqlServerValueConverters(DecimalHandlingMode
|
||||
.parse(connectorConfig.getConfig().getString(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE))
|
||||
.asDecimalMode()), schemaNameAdjuster, SourceInfo.SCHEMA),
|
||||
false);
|
||||
try {
|
||||
this.capturedTables = determineCapturedTables(connectorConfig, connection);
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import io.debezium.data.SpecialValueDecimal;
|
||||
import io.debezium.jdbc.JdbcValueConverters;
|
||||
import io.debezium.jdbc.TemporalPrecisionMode;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.time.ZonedTimestamp;
|
||||
@ -31,6 +32,22 @@ public class SqlServerValueConverters extends JdbcValueConverters {
|
||||
public SqlServerValueConverters() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance that always uses UTC for the default time zone when
|
||||
* converting values without timezone information to values that require
|
||||
* timezones.
|
||||
* <p>
|
||||
*
|
||||
* @param decimalMode
|
||||
* how {@code DECIMAL} and {@code NUMERIC} values should be
|
||||
* treated; may be null if
|
||||
* {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE}
|
||||
* is to be used
|
||||
*/
|
||||
public SqlServerValueConverters(DecimalMode decimalMode) {
|
||||
super(decimalMode, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, ZoneOffset.UTC, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaBuilder schemaBuilder(Column column) {
|
||||
switch (column.jdbcType()) {
|
||||
|
@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.DecimalHandlingMode;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
* Tests for numeric/decimal columsn with precise, string and decimal options
|
||||
*
|
||||
* @author Pradeep Mamillapalli
|
||||
*
|
||||
*/
|
||||
public class SQLServerNumericColumnIT extends AbstractConnectorTest {
|
||||
private SqlServerConnection connection;
|
||||
|
||||
/**
|
||||
* Create 2 Tables. Each table has 4 columns cola: Decimal(8,4) type with 8
|
||||
* precision and 4 scale colb: Decimal - Default precision(18) and default
|
||||
* scale(0) colc: numeric(7,1) - 7 precision and 1 scale cold: numeric-
|
||||
* Default precision(18) and default scale(0)
|
||||
*
|
||||
* @throws SQLException
|
||||
*/
|
||||
@Before
|
||||
public void before() throws SQLException {
|
||||
TestHelper.createTestDatabase();
|
||||
connection = TestHelper.testConnection();
|
||||
connection.execute(
|
||||
"CREATE TABLE tablenuma (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
|
||||
"CREATE TABLE tablenumb (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
|
||||
"CREATE TABLE tablenumc (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
|
||||
"CREATE TABLE tablenumd (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)");
|
||||
connection.enableTableCdc("tablea");
|
||||
connection.enableTableCdc("tableb");
|
||||
connection.enableTableCdc("tablec");
|
||||
connection.enableTableCdc("tabled");
|
||||
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws SQLException {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert 1 Record into tablenuma with {@code DecimalHandlingMode.STRING}
|
||||
* mode Assertions: - Connector is running - 1 Record are streamed out of
|
||||
* cdc - Assert cola, colb, colc, cold are exactly equal to the input
|
||||
* values.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void decimalModeConfigString() throws Exception {
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenuma")
|
||||
.with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING).build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
connection.execute("INSERT INTO tablenuma VALUES (111.1111, 1111111, 1111111.1, 1111111 );");
|
||||
final SourceRecords records = consumeRecordsByTopic(1);
|
||||
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablenuma");
|
||||
Assertions.assertThat(tableA).hasSize(1);
|
||||
final Struct valueA = (Struct) tableA.get(0).value();
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo("111.1111");
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo("1111111");
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo("1111111.1");
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo("1111111");
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert 1 Record into tablenumb with {@code DecimalHandlingMode.DOUBLE}
|
||||
* mode Assertions: - Connector is running - 1 Record are streamed out of
|
||||
* cdc - Assert cola, colb, colc, cold are exactly equal to the input values
|
||||
* in double format
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void decimalModeConfigDouble() throws Exception {
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenumb")
|
||||
.with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE).build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
connection.execute("INSERT INTO tablenumb VALUES (222.2222, 22222, 22222.2, 2222222 );");
|
||||
final SourceRecords records = consumeRecordsByTopic(1);
|
||||
final List<SourceRecord> results = records.recordsForTopic("server1.dbo.tablenumb");
|
||||
Assertions.assertThat(results).hasSize(1);
|
||||
final Struct valueA = (Struct) results.get(0).value();
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(222.2222d);
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(22222d);
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(22222.2d);
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(2222222d);
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert 1 Record into tablenumc with {@code DecimalHandlingMode.PRECISE}
|
||||
* mode Assertions: - Connector is running - 1 Record are streamed out of
|
||||
* cdc - Assert cola, colb, colc, cold are bytes
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void decimalModeConfigPrecise() throws Exception {
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenumc")
|
||||
.with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE).build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
connection.execute("INSERT INTO tablenumc VALUES (333.3333, 3333, 3333.3, 33333333 );");
|
||||
final SourceRecords records = consumeRecordsByTopic(1);
|
||||
final List<SourceRecord> results = records.recordsForTopic("server1.dbo.tablenumc");
|
||||
Assertions.assertThat(results).hasSize(1);
|
||||
final Struct valueA = (Struct) results.get(0).value();
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(BigDecimal.valueOf(333.3333));
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(BigDecimal.valueOf(3333));
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(BigDecimal.valueOf(3333.3));
|
||||
Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(BigDecimal.valueOf(33333333));
|
||||
stopConnector();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user