diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 2b37f63e8..412690e44 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -31,6 +31,7 @@ import io.debezium.data.Bits; import io.debezium.data.Json; import io.debezium.data.Uuid; +import io.debezium.data.VariableScaleDecimal; import io.debezium.data.geometry.Point; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.relational.Column; @@ -54,6 +55,13 @@ public class PostgresValueConverter extends JdbcValueConverters { */ protected static final double DAYS_PER_MONTH_AVG = 365.25 / 12.0d; + /** + * Variable scale decimal/numeric is defined by metadata + * scale - 0 + * length - 131089 + */ + private static final int VARIABLE_SCALE_DECIMAL_LENGTH = 131089; + protected PostgresValueConverter(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset) { super(decimalMode, adaptiveTimePrecision, defaultOffset, null); } @@ -87,6 +95,8 @@ public SchemaBuilder schemaBuilder(Column column) { return Point.builder(); case PgOid.MONEY: return Decimal.builder(column.scale()); + case PgOid.NUMERIC: + return numericSchema(column).optional(); case PgOid.INT2_ARRAY: return SchemaBuilder.array(SchemaBuilder.OPTIONAL_INT16_SCHEMA); case PgOid.INT4_ARRAY: @@ -96,14 +106,7 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.TEXT_ARRAY: return SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA); case PgOid.NUMERIC_ARRAY: - switch (decimalMode) { - case DOUBLE: - return SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA); - case PRECISE: - // values are fixed-precision decimal values with exact precision. - // Use Kafka Connect's arbitrary precision decimal type and use the column's specified scale ... - return SchemaBuilder.array(Decimal.builder(column.scale()).optional().build()); - } + return SchemaBuilder.array(numericSchema(column).optional()); case PgOid.FLOAT4_ARRAY: return SchemaBuilder.array(Schema.OPTIONAL_FLOAT32_SCHEMA); case PgOid.FLOAT8_ARRAY: @@ -142,6 +145,17 @@ public SchemaBuilder schemaBuilder(Column column) { } } + private SchemaBuilder numericSchema(final Column column) { + switch (decimalMode) { + case DOUBLE: + return SchemaBuilder.float64(); + case PRECISE: + return isVariableScaleDecimal(column) ? VariableScaleDecimal.builder() : Decimal.builder(column.scale()); + default: + throw new IllegalArgumentException("Unknown decimalMode"); + } + } + @Override public ValueConverter converter(Column column, Field fieldDefn) { int oidValue = PgOid.jdbcColumnToOid(column); @@ -216,6 +230,9 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) { if (column.scale() > newDecimal.scale()) { newDecimal = newDecimal.setScale(column.scale()); } + if (isVariableScaleDecimal(column)) { + return VariableScaleDecimal.fromLogical(fieldDefn.schema(), (BigDecimal)newDecimal); + } return newDecimal; } @@ -398,4 +415,8 @@ protected Object convertArray(Column column, Field fieldDefn, Object data) { } return data; } + + private boolean isVariableScaleDecimal(final Column column) { + return column.scale() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH; + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index 77b4b557f..edf8eaf3b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -45,6 +45,7 @@ import io.debezium.data.Bits; import io.debezium.data.Json; import io.debezium.data.Uuid; +import io.debezium.data.VariableScaleDecimal; import io.debezium.data.Xml; import io.debezium.data.geometry.Point; import io.debezium.relational.TableId; @@ -77,8 +78,10 @@ public abstract class AbstractRecordsProducerTest { "'barbar'::xml, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::UUID)"; protected static final String INSERT_STRING_TYPES_STMT = "INSERT INTO string_table (vc, vcv, ch, c, t) " + "VALUES ('aa', 'bb', 'cdef', 'abc', 'some text')"; - protected static final String INSERT_NUMERIC_TYPES_STMT = "INSERT INTO numeric_table (si, i, bi, d, n, r, db, ss, bs, b) " + - "VALUES (1, 123456, 1234567890123, 1.1, 22.22, 3.3, 4.44, 1, 123, true)"; + protected static final String INSERT_NUMERIC_TYPES_STMT = "INSERT INTO numeric_table (si, i, bi, r, db, ss, bs, b) " + + "VALUES (1, 123456, 1234567890123, 3.3, 4.44, 1, 123, true)"; + protected static final String INSERT_NUMERIC_DECIMAL_TYPES_STMT = "INSERT INTO numeric_decimal_table (d, dzs, dvs, n, nzs, nvs) " + + "VALUES (1.1, 10.11, 10.1111, 22.22, 22.2, 22.2222)"; protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) " + "VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')"; @@ -90,7 +93,8 @@ public abstract class AbstractRecordsProducerTest { protected static final String INSERT_QUOTED_TYPES_STMT = "INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") " + "VALUES ('some text')"; - protected static final Set ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_DATE_TIME_TYPES_STMT, + protected static final Set ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_NUMERIC_DECIMAL_TYPES_STMT, + INSERT_DATE_TIME_TYPES_STMT, INSERT_BIN_TYPES_STMT, INSERT_GEOM_TYPES_STMT, INSERT_TEXT_TYPES_STMT, INSERT_CASH_TYPES_STMT, INSERT_STRING_TYPES_STMT, INSERT_ARRAY_TYPES_STMT, INSERT_QUOTED_TYPES_STMT)); @@ -99,8 +103,6 @@ protected List schemasAndValuesForNumericType() { return Arrays.asList(new SchemaAndValueField("si", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 1), new SchemaAndValueField("i", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 123456), new SchemaAndValueField("bi", SchemaBuilder.OPTIONAL_INT64_SCHEMA, 1234567890123L), - new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal("1.10")), - new SchemaAndValueField("n", Decimal.builder(4).optional().build(), new BigDecimal("22.2200")), new SchemaAndValueField("r", Schema.OPTIONAL_FLOAT32_SCHEMA, 3.3f), new SchemaAndValueField("db", Schema.OPTIONAL_FLOAT64_SCHEMA, 4.44d), new SchemaAndValueField("ss", Schema.INT16_SCHEMA, (short) 1), @@ -108,6 +110,32 @@ protected List schemasAndValuesForNumericType() { new SchemaAndValueField("b", Schema.OPTIONAL_BOOLEAN_SCHEMA, Boolean.TRUE)); } + protected List schemasAndValuesForNumericDecimalType() { + final Struct dvs = new Struct(VariableScaleDecimal.schema()); + dvs.put("scale", 4).put("value", new BigDecimal("10.1111").unscaledValue().toByteArray()); + final Struct nvs = new Struct(VariableScaleDecimal.schema()); + nvs.put("scale", 4).put("value", new BigDecimal("22.2222").unscaledValue().toByteArray()); + return Arrays.asList( + new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal("1.10")), + // DBZ-351 new SchemaAndValueField("dzs", Decimal.builder(0).optional().build(), new BigDecimal("10")), + new SchemaAndValueField("dvs", VariableScaleDecimal.builder().optional().build(), dvs), + new SchemaAndValueField("n", Decimal.builder(4).optional().build(), new BigDecimal("22.2200")), + // DBZ-351 new SchemaAndValueField("nzs", Decimal.builder(0).optional().build(), new BigDecimal("22")), + new SchemaAndValueField("nvs", VariableScaleDecimal.builder().optional().build(), nvs) + ); + } + + protected List schemasAndValuesForImpreciseNumericDecimalType() { + return Arrays.asList( + new SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, 1.1d), + new SchemaAndValueField("dzs", Schema.OPTIONAL_FLOAT64_SCHEMA, 10d), + new SchemaAndValueField("dvs", Schema.OPTIONAL_FLOAT64_SCHEMA, 10.1111d), + new SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.22d), + new SchemaAndValueField("nzs", Schema.OPTIONAL_FLOAT64_SCHEMA, 22d), + new SchemaAndValueField("nvs", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.2222d) + ); + } + protected List schemasAndValuesForStringTypes() { return Arrays.asList(new SchemaAndValueField("vc", Schema.OPTIONAL_STRING_SCHEMA, "aa"), new SchemaAndValueField("vcv", Schema.OPTIONAL_STRING_SCHEMA, "bb"), @@ -197,6 +225,8 @@ protected List schemasAndValuesForTable(String insertTableS switch (insertTableStatement) { case INSERT_NUMERIC_TYPES_STMT: return schemasAndValuesForNumericType(); + case INSERT_NUMERIC_DECIMAL_TYPES_STMT: + return schemasAndValuesForNumericDecimalType(); case INSERT_BIN_TYPES_STMT: return schemaAndValuesForBinTypes(); case INSERT_CASH_TYPES_STMT: @@ -285,11 +315,33 @@ private void assertValue(Struct content) { assertEquals("Incorrect value type for " + fieldName, value.getClass(), actualValue.getClass()); if (actualValue instanceof byte[]) { assertArrayEquals("Values don't match for " + fieldName, (byte[]) value, (byte[]) actualValue); + } else if (actualValue instanceof Struct) { + assertStruct((Struct)value, (Struct)actualValue); } else { assertEquals("Values don't match for " + fieldName, value, actualValue); } } + private void assertStruct(final Struct expectedStruct, final Struct actualStruct) { + expectedStruct.schema().fields().stream().forEach(field -> { + final Object expectedValue = actualStruct.get(field); + if (expectedValue == null) { + assertNull(fieldName + " is present in the actual content", actualStruct.get(field.name())); + return; + } + final Object actualValue = actualStruct.get(field.name()); + assertNotNull("No value found for " + fieldName, actualValue); + assertEquals("Incorrect value type for " + fieldName, expectedValue.getClass(), actualValue.getClass()); + if (actualValue instanceof byte[]) { + assertArrayEquals("Values don't match for " + fieldName, (byte[]) expectedValue, (byte[]) actualValue); + } else if (actualValue instanceof Struct) { + assertStruct((Struct)expectedValue, (Struct)actualValue); + } else { + assertEquals("Values don't match for " + fieldName, expectedValue, actualValue); + } + }); + } + private void assertSchema(Struct content) { if (schema == null) { return; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTypeEncodingTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTypeEncodingTest.java new file mode 100644 index 000000000..65d6036ba --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTypeEncodingTest.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; + +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; + +import io.debezium.data.VariableScaleDecimal; + +public class CustomTypeEncodingTest { + + @Test + public void testVariableScaleDecimal() { + final BigDecimal testValue = new BigDecimal("138.456"); + final Struct struct = VariableScaleDecimal.fromLogical(VariableScaleDecimal.schema(), testValue); + final BigDecimal decodedValue = VariableScaleDecimal.toLogical(struct); + assertEquals("Number should be same after serde", testValue, decodedValue); + } + +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java index 59b0e3519..1f7df1dca 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java @@ -27,6 +27,7 @@ import io.debezium.data.Bits; import io.debezium.data.Json; import io.debezium.data.Uuid; +import io.debezium.data.VariableScaleDecimal; import io.debezium.data.Xml; import io.debezium.data.geometry.Point; import io.debezium.relational.Table; @@ -48,8 +49,8 @@ */ public class PostgresSchemaIT { - private static final String[] TEST_TABLES = new String[] { "public.numeric_table", "public.string_table", "public.cash_table", - "public.bitbin_table", + private static final String[] TEST_TABLES = new String[] { "public.numeric_table", "public.numeric_decimal_table", "public.string_table", + "public.cash_table","public.bitbin_table", "public.time_table", "public.text_table", "public.geom_table", "public.tstzrange_table", "public.array_table", "\"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\"" }; @@ -69,10 +70,17 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception { schema.refresh(connection, false); assertTablesIncluded(TEST_TABLES); Arrays.stream(TEST_TABLES).forEach(tableId -> assertKeySchema(tableId, "pk", Schema.INT32_SCHEMA)); - assertTableSchema("public.numeric_table", "si, i, bi, d, n, r, db, ss, bs, b", - Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, - Decimal.builder(2).optional().build(), Decimal.builder(4).optional().build(), Schema.OPTIONAL_FLOAT32_SCHEMA, + assertTableSchema("public.numeric_table", "si, i, bi, r, db, ss, bs, b", + Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA, Schema.INT16_SCHEMA, Schema.INT64_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA); + assertTableSchema("public.numeric_decimal_table", "d, dzs, dvs, n, nzs, nvs", + Decimal.builder(2).optional().build(), + Decimal.builder(0).optional().build(), + VariableScaleDecimal.builder().optional().build(), + Decimal.builder(4).optional().build(), + Decimal.builder(0).optional().build(), + VariableScaleDecimal.builder().optional().build() + ); assertTableSchema("public.string_table", "vc, vcv, ch, c, t", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index dfeea5087..2b4d1f2ca 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; @@ -68,6 +67,10 @@ public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Except //numerical types assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType()); + //numerical decimal types + consumer.expects(1); + assertInsert(INSERT_NUMERIC_DECIMAL_TYPES_STMT, schemasAndValuesForNumericDecimalType()); + // string types consumer.expects(1); assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypes()); @@ -319,11 +322,7 @@ public void shouldReceiveNumericTypeAsDouble() throws Exception { consumer = testConsumer(1); recordsProducer.start(consumer); - List schemasAndValuesForNumericType = schemasAndValuesForNumericType(); - schemasAndValuesForNumericType.set(3, new SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, 1.1d)); - schemasAndValuesForNumericType.set(4, new SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.22d)); - - assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType); + assertInsert(INSERT_NUMERIC_DECIMAL_TYPES_STMT, schemasAndValuesForImpreciseNumericDecimalType()); } private void assertInsert(String statement, List expectedSchemaAndValuesByColumn) { diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 1a60a1e79..a7448674d 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -3,7 +3,9 @@ -- Generate a number of tables to cover as many of the PG types as possible DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public; -CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT, d DECIMAL(3,2), n NUMERIC(6,4), r REAL, db DOUBLE PRECISION, ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk)); +CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT, r REAL, db DOUBLE PRECISION, ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk)); +-- no suffix -fixed scale, zs - zero scale, vs - variable scale +CREATE TABLE numeric_decimal_table (pk SERIAL, d DECIMAL(3,2), dzs DECIMAL(4), dvs DECIMAL, n NUMERIC(6,4), nzs NUMERIC(4), nvs NUMERIC, PRIMARY KEY(pk)); CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, PRIMARY KEY(pk)); CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk)); CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk)); diff --git a/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java b/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java new file mode 100644 index 000000000..8b8edbb80 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java @@ -0,0 +1,73 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.data; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +/** + * An arbitrary precision decimal value with variable scale. + * + * @author Jiri Pechanec + * + */ +public class VariableScaleDecimal { + public static final String LOGICAL_NAME = "io.debezium.data.VariableScaleDecimal"; + public static final String SCALE_FIELD = "scale"; + public static final String VALUE_FIELD = "value"; + + /** + * Returns a {@link SchemaBuilder} for a VariableScaleDecimal. You can use the resulting SchemaBuilder + * to set additional schema settings such as required/optional, default value, and documentation. + * + * @return the schema builder + */ + public static SchemaBuilder builder() { + return SchemaBuilder.struct() + .name(LOGICAL_NAME) + .version(1) + .doc("Variable scaled decimal") + .field(SCALE_FIELD, Schema.INT32_SCHEMA) + .field(VALUE_FIELD, Schema.BYTES_SCHEMA); + } + + /** + * Returns a Schema for a VariableScaleDecimal but with all other default Schema settings. + * + * @return the schema + * @see #builder() + */ + public static Schema schema() { + return builder().build(); + } + + /** + * Converts a value from its logical format (BigDecimal) to it's encoded format - a struct containing + * the scale of the number and a binary representation of the number + * + * @param value the logical value + * @return the encoded value + */ + public static Struct fromLogical(final Schema schema, final BigDecimal value) { + Struct result = new Struct(schema); + return result.put(SCALE_FIELD, value.scale()).put(VALUE_FIELD, value.unscaledValue().toByteArray()); + } + + /** + * Decodes the encoded value - see {@link #fromLogical(Schema, BigDecimal)} for encoding format + * + * @param value the encoded value + * @return the decoded value + */ + public static BigDecimal toLogical(final Struct value) { + return new BigDecimal(new BigInteger((byte[])value.getBytes(VALUE_FIELD)), value.getInt32(SCALE_FIELD)); + } +}