DBZ-953 Ability to configure "decimal.handling.mode" for SQL Server connector

This commit is contained in:
pradeepmvn 2018-10-27 00:08:52 -04:00 committed by Jiri Pechanec
parent 4cf3b0343b
commit d8e78b71ff
4 changed files with 283 additions and 2 deletions

View File

@ -17,6 +17,7 @@
import io.debezium.document.Document;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
@ -208,6 +209,14 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
+ "which means that the connector does not hold any locks for all monitored tables."
+ "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exlusive lock (and thus prevents any reads and updates) for all monitored tables.");
public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode")
.withDisplayName("Decimal Handling").withEnum(DecimalHandlingMode.class, DecimalHandlingMode.PRECISE)
.withWidth(Width.SHORT).withImportance(Importance.MEDIUM).withDescription(
"Specify how DECIMAL and NUMERIC columns should be represented in change events, including:"
+ "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; "
+ "'string' uses string to represent values; "
+ "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers.");
/**
* The set of {@link Field}s defined as part of this configuration.
*/
@ -286,4 +295,100 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
}
};
}
}
/**
* Returns the Decimal mode Enum for {@code decimal.handling.mode}
* configuration This defaults to {@code precise} if nothing is provided.
*
* @return
*/
public DecimalMode getDecimalModeConfig() {
return DecimalHandlingMode
.parse(this.getConfig().getString(DECIMAL_HANDLING_MODE))
.asDecimalMode();
}
/**
* The set of predefined DecimalHandlingMode options or aliases.
*/
public enum DecimalHandlingMode implements EnumeratedValue {
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise
* {@link BigDecimal} values, which are represented in change events in
* a binary form. This is precise but difficult to use.
*/
PRECISE("precise"),
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as a string
* values. This is precise, it supports also special values but the type
* information is lost.
*/
STRING("string"),
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise
* {@code double} values. This may be less precise but is far easier to
* use.
*/
DOUBLE("double");
private final String value;
private DecimalHandlingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
public DecimalMode asDecimalMode() {
switch (this) {
case DOUBLE:
return DecimalMode.DOUBLE;
case STRING:
return DecimalMode.STRING;
case PRECISE:
default:
return DecimalMode.PRECISE;
}
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value
* the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DecimalHandlingMode parse(String value) {
if (value == null)
return null;
value = value.trim();
for (DecimalHandlingMode option : DecimalHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value))
return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value
* the configuration property value; may not be null
* @param defaultValue
* the default value; may be null
* @return the matching option, or null if no match is found and the
* non-null default is invalid
*/
public static DecimalHandlingMode parse(String value, String defaultValue) {
DecimalHandlingMode mode = parse(value);
if (mode == null && defaultValue != null)
mode = parse(defaultValue);
return mode;
}
}
}

View File

@ -12,6 +12,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -37,7 +38,9 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, SqlServerConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA),
new TableSchemaBuilder(new SqlServerValueConverters(DecimalHandlingMode
.parse(connectorConfig.getConfig().getString(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE))
.asDecimalMode()), schemaNameAdjuster, SourceInfo.SCHEMA),
false);
try {
this.capturedTables = determineCapturedTables(connectorConfig, connection);

View File

@ -15,6 +15,7 @@
import io.debezium.data.SpecialValueDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.ZonedTimestamp;
@ -31,6 +32,22 @@ public class SqlServerValueConverters extends JdbcValueConverters {
public SqlServerValueConverters() {
}
/**
* Create a new instance that always uses UTC for the default time zone when
* converting values without timezone information to values that require
* timezones.
* <p>
*
* @param decimalMode
* how {@code DECIMAL} and {@code NUMERIC} values should be
* treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE}
* is to be used
*/
public SqlServerValueConverters(DecimalMode decimalMode) {
super(decimalMode, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, ZoneOffset.UTC, null, null);
}
@Override
public SchemaBuilder schemaBuilder(Column column) {
switch (column.jdbcType()) {

View File

@ -0,0 +1,156 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.DecimalHandlingMode;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* Tests for numeric/decimal columsn with precise, string and decimal options
*
* @author Pradeep Mamillapalli
*
*/
public class SQLServerNumericColumnIT extends AbstractConnectorTest {
private SqlServerConnection connection;
/**
* Create 2 Tables. Each table has 4 columns cola: Decimal(8,4) type with 8
* precision and 4 scale colb: Decimal - Default precision(18) and default
* scale(0) colc: numeric(7,1) - 7 precision and 1 scale cold: numeric-
* Default precision(18) and default scale(0)
*
* @throws SQLException
*/
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE tablenuma (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumb (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumc (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumd (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)");
connection.enableTableCdc("tablea");
connection.enableTableCdc("tableb");
connection.enableTableCdc("tablec");
connection.enableTableCdc("tabled");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws SQLException {
if (connection != null) {
connection.close();
}
}
/**
* Insert 1 Record into tablenuma with {@code DecimalHandlingMode.STRING}
* mode Assertions: - Connector is running - 1 Record are streamed out of
* cdc - Assert cola, colb, colc, cold are exactly equal to the input
* values.
*
* @throws Exception
*/
@Test
public void decimalModeConfigString() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenuma")
.with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING).build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.execute("INSERT INTO tablenuma VALUES (111.1111, 1111111, 1111111.1, 1111111 );");
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablenuma");
Assertions.assertThat(tableA).hasSize(1);
final Struct valueA = (Struct) tableA.get(0).value();
Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo("111.1111");
Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo("1111111");
Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo("1111111.1");
Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo("1111111");
stopConnector();
}
/**
* Insert 1 Record into tablenumb with {@code DecimalHandlingMode.DOUBLE}
* mode Assertions: - Connector is running - 1 Record are streamed out of
* cdc - Assert cola, colb, colc, cold are exactly equal to the input values
* in double format
*
* @throws Exception
*/
@Test
public void decimalModeConfigDouble() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenumb")
.with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE).build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.execute("INSERT INTO tablenumb VALUES (222.2222, 22222, 22222.2, 2222222 );");
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> results = records.recordsForTopic("server1.dbo.tablenumb");
Assertions.assertThat(results).hasSize(1);
final Struct valueA = (Struct) results.get(0).value();
Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(222.2222d);
Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(22222d);
Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(22222.2d);
Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(2222222d);
stopConnector();
}
/**
* Insert 1 Record into tablenumc with {@code DecimalHandlingMode.PRECISE}
* mode Assertions: - Connector is running - 1 Record are streamed out of
* cdc - Assert cola, colb, colc, cold are bytes
*
* @throws Exception
*/
@Test
public void decimalModeConfigPrecise() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenumc")
.with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE).build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.execute("INSERT INTO tablenumc VALUES (333.3333, 3333, 3333.3, 33333333 );");
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> results = records.recordsForTopic("server1.dbo.tablenumc");
Assertions.assertThat(results).hasSize(1);
final Struct valueA = (Struct) results.get(0).value();
Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(BigDecimal.valueOf(333.3333));
Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(BigDecimal.valueOf(3333));
Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(BigDecimal.valueOf(3333.3));
Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(BigDecimal.valueOf(33333333));
stopConnector();
}
}