From bddf71532451e49f12d7c63757e3cf5d7df51d6c Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Tue, 25 Jul 2017 14:34:21 +0200 Subject: [PATCH] DBZ-287 Avoiding double conversion; changing test to use a larger scale --- .../postgresql/PostgresValueConverter.java | 42 +++++++++---------- .../AbstractRecordsProducerTest.java | 5 +-- .../postgresql/PostgresSchemaIT.java | 2 +- .../test/resources/postgres_create_tables.ddl | 2 +- 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 8f519aa19..48d3efa18 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -8,7 +8,6 @@ import java.math.BigDecimal; import java.math.BigInteger; -import java.math.MathContext; import java.sql.SQLException; import java.time.Instant; import java.time.LocalDateTime; @@ -41,23 +40,23 @@ /** * A provider of {@link ValueConverter}s and {@link SchemaBuilder}s for various Postgres specific column types. - * + * * In addition to handling data type conversion from values coming from JDBC, this is also expected to handle data type * conversion for data types coming from the logical decoding plugin. - * + * * @author Horia Chiorean (hchiorea@redhat.com) */ public class PostgresValueConverter extends JdbcValueConverters { - + /** * The approximation used by the plugin when converting a duration to micros */ protected static final double DAYS_PER_MONTH_AVG = 365.25 / 12.0d; - + protected PostgresValueConverter(boolean adaptiveTimePrecision, ZoneOffset defaultOffset) { super(DecimalMode.PRECISE, adaptiveTimePrecision, defaultOffset, null); } - + @Override public SchemaBuilder schemaBuilder(Column column) { int oidValue = PgOid.jdbcColumnToOid(column); @@ -91,7 +90,7 @@ public SchemaBuilder schemaBuilder(Column column) { return super.schemaBuilder(column); } } - + @Override public ValueConverter converter(Column column, Field fieldDefn) { int oidValue = PgOid.jdbcColumnToOid(column); @@ -122,7 +121,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { return super.converter(column, fieldDefn); } } - + @Override protected Object convertDecimal(Column column, Field fieldDefn, Object data) { BigDecimal newDecimal = (BigDecimal) super.convertDecimal(column, fieldDefn, data); @@ -130,8 +129,7 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) { return newDecimal; } if (column.scale() > newDecimal.scale()) { - MathContext mc = new MathContext((newDecimal.precision() - newDecimal.scale()) + column.scale()); - newDecimal = new BigDecimal(newDecimal.doubleValue(), mc); + newDecimal = newDecimal.setScale(column.scale()); } return newDecimal; } @@ -143,7 +141,7 @@ protected Object convertBit(Column column, Field fieldDefn, Object data) { } return super.convertBit(column, fieldDefn, data); } - + @Override protected Object convertBits(Column column, Field fieldDefn, Object data, int numBytes) { if (data instanceof PGobject) { @@ -163,7 +161,7 @@ protected Object convertBits(Column column, Field fieldDefn, Object data, int nu } return super.convertBits(column, fieldDefn, data, numBytes); } - + protected Object convertMoney(Column column, Field fieldDefn, Object data) { if (data == null) { data = fieldDefn.schema().defaultValue(); @@ -181,7 +179,7 @@ protected Object convertMoney(Column column, Field fieldDefn, Object data) { } return handleUnknownData(column, fieldDefn, data); } - + protected Object convertInterval(Column column, Field fieldDefn, Object data) { if (data == null) { data = fieldDefn.schema().defaultValue(); @@ -201,7 +199,7 @@ protected Object convertInterval(Column column, Field fieldDefn, Object data) { } return handleUnknownData(column, fieldDefn, data); } - + @Override protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object data) { if (data instanceof Long) { @@ -209,7 +207,7 @@ protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, O } return super.convertTimestampToEpochMillis(column, fieldDefn, data); } - + @Override protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object data) { if (data instanceof Long) { @@ -217,7 +215,7 @@ protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, O } return super.convertTimestampToEpochMicros(column, fieldDefn, data); } - + @Override protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object data) { if (data instanceof Long) { @@ -225,7 +223,7 @@ protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Ob } return super.convertTimestampToEpochNanos(column, fieldDefn, data); } - + @Override protected Object convertTimestampToEpochMillisAsDate(Column column, Field fieldDefn, Object data) { if (data instanceof Long) { @@ -233,7 +231,7 @@ protected Object convertTimestampToEpochMillisAsDate(Column column, Field fieldD } return super.convertTimestampToEpochMillisAsDate(column, fieldDefn, data); } - + @Override protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) { if (data instanceof Long) { @@ -259,7 +257,7 @@ protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data } return super.convertTimeWithZone(column, fieldDefn, data); } - + private static LocalDateTime nanosToLocalDateTimeUTC(long epocNanos) { // the pg plugin stores date/time info as microseconds since epoch BigInteger epochMicrosBigInt = BigInteger.valueOf(epocNanos); @@ -267,10 +265,10 @@ private static LocalDateTime nanosToLocalDateTimeUTC(long epocNanos) { return LocalDateTime.ofInstant(Instant.ofEpochSecond(secondsAndNanos[0].longValue(), secondsAndNanos[1].longValue()), ZoneOffset.UTC); } - + /** * Converts a value representing a Postgres point for a column, to a Kafka Connect value. - * + * * @param column the JDBC column; never null * @param fieldDefn the Connect field definition for this column; never null * @param data a data for the point column, either coming from the JDBC driver or logical decoding plugin @@ -304,4 +302,4 @@ protected Object convertPoint(Column column, Field fieldDefn, Object data) { } return handleUnknownData(column, fieldDefn, data); } -} \ No newline at end of file +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index b28788287..17f91f608 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -14,7 +14,6 @@ import static org.junit.Assert.fail; import java.math.BigDecimal; -import java.math.MathContext; import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; @@ -91,8 +90,8 @@ protected List schemasAndValuesForNumericType() { return Arrays.asList(new SchemaAndValueField("si", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 1), new SchemaAndValueField("i", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 123456), new SchemaAndValueField("bi", SchemaBuilder.OPTIONAL_INT64_SCHEMA, 1234567890123L), - new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal(1.10, new MathContext(3))), - new SchemaAndValueField("n", Decimal.builder(3).optional().build(), new BigDecimal(22.220, new MathContext(5))), + new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal("1.10")), + new SchemaAndValueField("n", Decimal.builder(4).optional().build(), new BigDecimal("22.2200")), new SchemaAndValueField("r", Schema.OPTIONAL_FLOAT32_SCHEMA, 3.3f), new SchemaAndValueField("db", Schema.OPTIONAL_FLOAT64_SCHEMA, 4.44d), new SchemaAndValueField("ss", Schema.INT16_SCHEMA, (short) 1), diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java index b60d9b3d2..932dc9de9 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java @@ -66,7 +66,7 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception { Arrays.stream(TEST_TABLES).forEach(tableId -> assertKeySchema(tableId, "pk", Schema.INT32_SCHEMA)); assertTableSchema("public.numeric_table", "si, i, bi, d, n, r, db, ss, bs, b", Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, - Decimal.builder(2).optional().build(), Decimal.builder(3).optional().build(), Schema.OPTIONAL_FLOAT32_SCHEMA, + Decimal.builder(2).optional().build(), Decimal.builder(4).optional().build(), Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA, Schema.INT16_SCHEMA, Schema.INT64_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA); assertTableSchema("public.string_table", "vc, vcv, ch, c, t", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 3ccd86cad..6105ac27d 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -1,7 +1,7 @@ -- Generate a number of tables to cover as many of the PG types as possible DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public; -CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT, d DECIMAL(3,2), n NUMERIC(5,3), r REAL, db DOUBLE PRECISION, ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk)); +CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT, d DECIMAL(3,2), n NUMERIC(6,4), r REAL, db DOUBLE PRECISION, ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk)); CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, PRIMARY KEY(pk)); CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk)); CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk));