diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index c4073a071..77e211c68 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -17,6 +17,7 @@ import io.debezium.document.Document; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; @@ -208,6 +209,14 @@ public static SnapshotLockingMode parse(String value, String defaultValue) { + "which means that the connector does not hold any locks for all monitored tables." + "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exlusive lock (and thus prevents any reads and updates) for all monitored tables."); + public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode") + .withDisplayName("Decimal Handling").withEnum(DecimalHandlingMode.class, DecimalHandlingMode.PRECISE) + .withWidth(Width.SHORT).withImportance(Importance.MEDIUM).withDescription( + "Specify how DECIMAL and NUMERIC columns should be represented in change events, including:" + + "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; " + + "'string' uses string to represent values; " + + "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers."); + /** * The set of {@link Field}s defined as part of this configuration. */ @@ -286,4 +295,100 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { } }; } -} + + /** + * Returns the Decimal mode Enum for {@code decimal.handling.mode} + * configuration This defaults to {@code precise} if nothing is provided. + * + * @return + */ + public DecimalMode getDecimalModeConfig() { + return DecimalHandlingMode + .parse(this.getConfig().getString(DECIMAL_HANDLING_MODE)) + .asDecimalMode(); + } + + /** + * The set of predefined DecimalHandlingMode options or aliases. + */ + public enum DecimalHandlingMode implements EnumeratedValue { + /** + * Represent {@code DECIMAL} and {@code NUMERIC} values as precise + * {@link BigDecimal} values, which are represented in change events in + * a binary form. This is precise but difficult to use. + */ + PRECISE("precise"), + + /** + * Represent {@code DECIMAL} and {@code NUMERIC} values as a string + * values. This is precise, it supports also special values but the type + * information is lost. + */ + STRING("string"), + + /** + * Represent {@code DECIMAL} and {@code NUMERIC} values as precise + * {@code double} values. This may be less precise but is far easier to + * use. + */ + DOUBLE("double"); + + private final String value; + + private DecimalHandlingMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + public DecimalMode asDecimalMode() { + switch (this) { + case DOUBLE: + return DecimalMode.DOUBLE; + case STRING: + return DecimalMode.STRING; + case PRECISE: + default: + return DecimalMode.PRECISE; + } + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value + * the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static DecimalHandlingMode parse(String value) { + if (value == null) + return null; + value = value.trim(); + for (DecimalHandlingMode option : DecimalHandlingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) + return option; + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value + * the configuration property value; may not be null + * @param defaultValue + * the default value; may be null + * @return the matching option, or null if no match is found and the + * non-null default is invalid + */ + public static DecimalHandlingMode parse(String value, String defaultValue) { + DecimalHandlingMode mode = parse(value); + if (mode == null && defaultValue != null) + mode = parse(defaultValue); + return mode; + } + } +} \ No newline at end of file diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java index a370538b9..d3dcd9e09 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.DecimalHandlingMode; import io.debezium.relational.HistorizedRelationalDatabaseSchema; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -37,7 +38,9 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, SqlServerConnection connection) { super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, - new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA), + new TableSchemaBuilder(new SqlServerValueConverters(DecimalHandlingMode + .parse(connectorConfig.getConfig().getString(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE)) + .asDecimalMode()), schemaNameAdjuster, SourceInfo.SCHEMA), false); try { this.capturedTables = determineCapturedTables(connectorConfig, connection); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java index 1b47774a0..40b3820e5 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java @@ -15,6 +15,7 @@ import io.debezium.data.SpecialValueDecimal; import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; import io.debezium.relational.ValueConverter; import io.debezium.time.ZonedTimestamp; @@ -31,6 +32,22 @@ public class SqlServerValueConverters extends JdbcValueConverters { public SqlServerValueConverters() { } + /** + * Create a new instance that always uses UTC for the default time zone when + * converting values without timezone information to values that require + * timezones. + *

+ * + * @param decimalMode + * how {@code DECIMAL} and {@code NUMERIC} values should be + * treated; may be null if + * {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE} + * is to be used + */ + public SqlServerValueConverters(DecimalMode decimalMode) { + super(decimalMode, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, ZoneOffset.UTC, null, null); + } + @Override public SchemaBuilder schemaBuilder(Column column) { switch (column.jdbcType()) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java new file mode 100644 index 000000000..9eda1904d --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java @@ -0,0 +1,156 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.fest.assertions.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.DecimalHandlingMode; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +/** + * Tests for numeric/decimal columsn with precise, string and decimal options + * + * @author Pradeep Mamillapalli + * + */ +public class SQLServerNumericColumnIT extends AbstractConnectorTest { + private SqlServerConnection connection; + + /** + * Create 2 Tables. Each table has 4 columns cola: Decimal(8,4) type with 8 + * precision and 4 scale colb: Decimal - Default precision(18) and default + * scale(0) colc: numeric(7,1) - 7 precision and 1 scale cold: numeric- + * Default precision(18) and default scale(0) + * + * @throws SQLException + */ + @Before + public void before() throws SQLException { + TestHelper.createTestDatabase(); + connection = TestHelper.testConnection(); + connection.execute( + "CREATE TABLE tablenuma (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)", + "CREATE TABLE tablenumb (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)", + "CREATE TABLE tablenumc (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)", + "CREATE TABLE tablenumd (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)"); + connection.enableTableCdc("tablea"); + connection.enableTableCdc("tableb"); + connection.enableTableCdc("tablec"); + connection.enableTableCdc("tabled"); + + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + } + + @After + public void after() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + /** + * Insert 1 Record into tablenuma with {@code DecimalHandlingMode.STRING} + * mode Assertions: - Connector is running - 1 Record are streamed out of + * cdc - Assert cola, colb, colc, cold are exactly equal to the input + * values. + * + * @throws Exception + */ + @Test + public void decimalModeConfigString() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenuma") + .with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING).build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + connection.execute("INSERT INTO tablenuma VALUES (111.1111, 1111111, 1111111.1, 1111111 );"); + final SourceRecords records = consumeRecordsByTopic(1); + final List tableA = records.recordsForTopic("server1.dbo.tablenuma"); + Assertions.assertThat(tableA).hasSize(1); + final Struct valueA = (Struct) tableA.get(0).value(); + Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo("111.1111"); + Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo("1111111"); + Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo("1111111.1"); + Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo("1111111"); + stopConnector(); + } + + /** + * Insert 1 Record into tablenumb with {@code DecimalHandlingMode.DOUBLE} + * mode Assertions: - Connector is running - 1 Record are streamed out of + * cdc - Assert cola, colb, colc, cold are exactly equal to the input values + * in double format + * + * @throws Exception + */ + @Test + public void decimalModeConfigDouble() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenumb") + .with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE).build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + connection.execute("INSERT INTO tablenumb VALUES (222.2222, 22222, 22222.2, 2222222 );"); + final SourceRecords records = consumeRecordsByTopic(1); + final List results = records.recordsForTopic("server1.dbo.tablenumb"); + Assertions.assertThat(results).hasSize(1); + final Struct valueA = (Struct) results.get(0).value(); + Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(222.2222d); + Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(22222d); + Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(22222.2d); + Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(2222222d); + stopConnector(); + } + + /** + * Insert 1 Record into tablenumc with {@code DecimalHandlingMode.PRECISE} + * mode Assertions: - Connector is running - 1 Record are streamed out of + * cdc - Assert cola, colb, colc, cold are bytes + * + * @throws Exception + */ + @Test + public void decimalModeConfigPrecise() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tablenumc") + .with(SqlServerConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE).build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + connection.execute("INSERT INTO tablenumc VALUES (333.3333, 3333, 3333.3, 33333333 );"); + final SourceRecords records = consumeRecordsByTopic(1); + final List results = records.recordsForTopic("server1.dbo.tablenumc"); + Assertions.assertThat(results).hasSize(1); + final Struct valueA = (Struct) results.get(0).value(); + Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(BigDecimal.valueOf(333.3333)); + Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(BigDecimal.valueOf(3333)); + Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(BigDecimal.valueOf(3333.3)); + Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(BigDecimal.valueOf(33333333)); + stopConnector(); + } +}