DBZ-318 Support for a Decimale with variable scale schema

This commit is contained in:
Jiri Pechanec 2017-09-06 11:27:50 +02:00 committed by Gunnar Morling
parent 99f39038bb
commit ba3d7d762b
7 changed files with 208 additions and 25 deletions

View File

@ -31,6 +31,7 @@
import io.debezium.data.Bits;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.geometry.Point;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.relational.Column;
@ -54,6 +55,13 @@ public class PostgresValueConverter extends JdbcValueConverters {
*/
protected static final double DAYS_PER_MONTH_AVG = 365.25 / 12.0d;
/**
* Variable scale decimal/numeric is defined by metadata
* scale - 0
* length - 131089
*/
private static final int VARIABLE_SCALE_DECIMAL_LENGTH = 131089;
protected PostgresValueConverter(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset) {
super(decimalMode, adaptiveTimePrecision, defaultOffset, null);
}
@ -87,6 +95,8 @@ public SchemaBuilder schemaBuilder(Column column) {
return Point.builder();
case PgOid.MONEY:
return Decimal.builder(column.scale());
case PgOid.NUMERIC:
return numericSchema(column).optional();
case PgOid.INT2_ARRAY:
return SchemaBuilder.array(SchemaBuilder.OPTIONAL_INT16_SCHEMA);
case PgOid.INT4_ARRAY:
@ -96,14 +106,7 @@ public SchemaBuilder schemaBuilder(Column column) {
case PgOid.TEXT_ARRAY:
return SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
case PgOid.NUMERIC_ARRAY:
switch (decimalMode) {
case DOUBLE:
return SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA);
case PRECISE:
// values are fixed-precision decimal values with exact precision.
// Use Kafka Connect's arbitrary precision decimal type and use the column's specified scale ...
return SchemaBuilder.array(Decimal.builder(column.scale()).optional().build());
}
return SchemaBuilder.array(numericSchema(column).optional());
case PgOid.FLOAT4_ARRAY:
return SchemaBuilder.array(Schema.OPTIONAL_FLOAT32_SCHEMA);
case PgOid.FLOAT8_ARRAY:
@ -142,6 +145,17 @@ public SchemaBuilder schemaBuilder(Column column) {
}
}
private SchemaBuilder numericSchema(final Column column) {
switch (decimalMode) {
case DOUBLE:
return SchemaBuilder.float64();
case PRECISE:
return isVariableScaleDecimal(column) ? VariableScaleDecimal.builder() : Decimal.builder(column.scale());
default:
throw new IllegalArgumentException("Unknown decimalMode");
}
}
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
int oidValue = PgOid.jdbcColumnToOid(column);
@ -216,6 +230,9 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) {
if (column.scale() > newDecimal.scale()) {
newDecimal = newDecimal.setScale(column.scale());
}
if (isVariableScaleDecimal(column)) {
return VariableScaleDecimal.fromLogical(fieldDefn.schema(), (BigDecimal)newDecimal);
}
return newDecimal;
}
@ -398,4 +415,8 @@ protected Object convertArray(Column column, Field fieldDefn, Object data) {
}
return data;
}
private boolean isVariableScaleDecimal(final Column column) {
return column.scale() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH;
}
}

View File

@ -45,6 +45,7 @@
import io.debezium.data.Bits;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.Xml;
import io.debezium.data.geometry.Point;
import io.debezium.relational.TableId;
@ -77,8 +78,10 @@ public abstract class AbstractRecordsProducerTest {
"'<foo>bar</foo><foo>bar</foo>'::xml, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::UUID)";
protected static final String INSERT_STRING_TYPES_STMT = "INSERT INTO string_table (vc, vcv, ch, c, t) " +
"VALUES ('aa', 'bb', 'cdef', 'abc', 'some text')";
protected static final String INSERT_NUMERIC_TYPES_STMT = "INSERT INTO numeric_table (si, i, bi, d, n, r, db, ss, bs, b) " +
"VALUES (1, 123456, 1234567890123, 1.1, 22.22, 3.3, 4.44, 1, 123, true)";
protected static final String INSERT_NUMERIC_TYPES_STMT = "INSERT INTO numeric_table (si, i, bi, r, db, ss, bs, b) " +
"VALUES (1, 123456, 1234567890123, 3.3, 4.44, 1, 123, true)";
protected static final String INSERT_NUMERIC_DECIMAL_TYPES_STMT = "INSERT INTO numeric_decimal_table (d, dzs, dvs, n, nzs, nvs) " +
"VALUES (1.1, 10.11, 10.1111, 22.22, 22.2, 22.2222)";
protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) " +
"VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')";
@ -90,7 +93,8 @@ public abstract class AbstractRecordsProducerTest {
protected static final String INSERT_QUOTED_TYPES_STMT = "INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") " +
"VALUES ('some text')";
protected static final Set<String> ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_DATE_TIME_TYPES_STMT,
protected static final Set<String> ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_NUMERIC_DECIMAL_TYPES_STMT,
INSERT_DATE_TIME_TYPES_STMT,
INSERT_BIN_TYPES_STMT, INSERT_GEOM_TYPES_STMT, INSERT_TEXT_TYPES_STMT,
INSERT_CASH_TYPES_STMT, INSERT_STRING_TYPES_STMT, INSERT_ARRAY_TYPES_STMT,
INSERT_QUOTED_TYPES_STMT));
@ -99,8 +103,6 @@ protected List<SchemaAndValueField> schemasAndValuesForNumericType() {
return Arrays.asList(new SchemaAndValueField("si", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 1),
new SchemaAndValueField("i", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 123456),
new SchemaAndValueField("bi", SchemaBuilder.OPTIONAL_INT64_SCHEMA, 1234567890123L),
new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal("1.10")),
new SchemaAndValueField("n", Decimal.builder(4).optional().build(), new BigDecimal("22.2200")),
new SchemaAndValueField("r", Schema.OPTIONAL_FLOAT32_SCHEMA, 3.3f),
new SchemaAndValueField("db", Schema.OPTIONAL_FLOAT64_SCHEMA, 4.44d),
new SchemaAndValueField("ss", Schema.INT16_SCHEMA, (short) 1),
@ -108,6 +110,32 @@ protected List<SchemaAndValueField> schemasAndValuesForNumericType() {
new SchemaAndValueField("b", Schema.OPTIONAL_BOOLEAN_SCHEMA, Boolean.TRUE));
}
protected List<SchemaAndValueField> schemasAndValuesForNumericDecimalType() {
final Struct dvs = new Struct(VariableScaleDecimal.schema());
dvs.put("scale", 4).put("value", new BigDecimal("10.1111").unscaledValue().toByteArray());
final Struct nvs = new Struct(VariableScaleDecimal.schema());
nvs.put("scale", 4).put("value", new BigDecimal("22.2222").unscaledValue().toByteArray());
return Arrays.asList(
new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal("1.10")),
// DBZ-351 new SchemaAndValueField("dzs", Decimal.builder(0).optional().build(), new BigDecimal("10")),
new SchemaAndValueField("dvs", VariableScaleDecimal.builder().optional().build(), dvs),
new SchemaAndValueField("n", Decimal.builder(4).optional().build(), new BigDecimal("22.2200")),
// DBZ-351 new SchemaAndValueField("nzs", Decimal.builder(0).optional().build(), new BigDecimal("22")),
new SchemaAndValueField("nvs", VariableScaleDecimal.builder().optional().build(), nvs)
);
}
protected List<SchemaAndValueField> schemasAndValuesForImpreciseNumericDecimalType() {
return Arrays.asList(
new SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, 1.1d),
new SchemaAndValueField("dzs", Schema.OPTIONAL_FLOAT64_SCHEMA, 10d),
new SchemaAndValueField("dvs", Schema.OPTIONAL_FLOAT64_SCHEMA, 10.1111d),
new SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.22d),
new SchemaAndValueField("nzs", Schema.OPTIONAL_FLOAT64_SCHEMA, 22d),
new SchemaAndValueField("nvs", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.2222d)
);
}
protected List<SchemaAndValueField> schemasAndValuesForStringTypes() {
return Arrays.asList(new SchemaAndValueField("vc", Schema.OPTIONAL_STRING_SCHEMA, "aa"),
new SchemaAndValueField("vcv", Schema.OPTIONAL_STRING_SCHEMA, "bb"),
@ -197,6 +225,8 @@ protected List<SchemaAndValueField> schemasAndValuesForTable(String insertTableS
switch (insertTableStatement) {
case INSERT_NUMERIC_TYPES_STMT:
return schemasAndValuesForNumericType();
case INSERT_NUMERIC_DECIMAL_TYPES_STMT:
return schemasAndValuesForNumericDecimalType();
case INSERT_BIN_TYPES_STMT:
return schemaAndValuesForBinTypes();
case INSERT_CASH_TYPES_STMT:
@ -285,11 +315,33 @@ private void assertValue(Struct content) {
assertEquals("Incorrect value type for " + fieldName, value.getClass(), actualValue.getClass());
if (actualValue instanceof byte[]) {
assertArrayEquals("Values don't match for " + fieldName, (byte[]) value, (byte[]) actualValue);
} else if (actualValue instanceof Struct) {
assertStruct((Struct)value, (Struct)actualValue);
} else {
assertEquals("Values don't match for " + fieldName, value, actualValue);
}
}
private void assertStruct(final Struct expectedStruct, final Struct actualStruct) {
expectedStruct.schema().fields().stream().forEach(field -> {
final Object expectedValue = actualStruct.get(field);
if (expectedValue == null) {
assertNull(fieldName + " is present in the actual content", actualStruct.get(field.name()));
return;
}
final Object actualValue = actualStruct.get(field.name());
assertNotNull("No value found for " + fieldName, actualValue);
assertEquals("Incorrect value type for " + fieldName, expectedValue.getClass(), actualValue.getClass());
if (actualValue instanceof byte[]) {
assertArrayEquals("Values don't match for " + fieldName, (byte[]) expectedValue, (byte[]) actualValue);
} else if (actualValue instanceof Struct) {
assertStruct((Struct)expectedValue, (Struct)actualValue);
} else {
assertEquals("Values don't match for " + fieldName, expectedValue, actualValue);
}
});
}
private void assertSchema(Struct content) {
if (schema == null) {
return;

View File

@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import static org.junit.Assert.assertEquals;
import java.math.BigDecimal;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import io.debezium.data.VariableScaleDecimal;
public class CustomTypeEncodingTest {
@Test
public void testVariableScaleDecimal() {
final BigDecimal testValue = new BigDecimal("138.456");
final Struct struct = VariableScaleDecimal.fromLogical(VariableScaleDecimal.schema(), testValue);
final BigDecimal decodedValue = VariableScaleDecimal.toLogical(struct);
assertEquals("Number should be same after serde", testValue, decodedValue);
}
}

View File

@ -27,6 +27,7 @@
import io.debezium.data.Bits;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.Xml;
import io.debezium.data.geometry.Point;
import io.debezium.relational.Table;
@ -48,8 +49,8 @@
*/
public class PostgresSchemaIT {
private static final String[] TEST_TABLES = new String[] { "public.numeric_table", "public.string_table", "public.cash_table",
"public.bitbin_table",
private static final String[] TEST_TABLES = new String[] { "public.numeric_table", "public.numeric_decimal_table", "public.string_table",
"public.cash_table","public.bitbin_table",
"public.time_table", "public.text_table", "public.geom_table", "public.tstzrange_table",
"public.array_table", "\"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\""
};
@ -69,10 +70,17 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
schema.refresh(connection, false);
assertTablesIncluded(TEST_TABLES);
Arrays.stream(TEST_TABLES).forEach(tableId -> assertKeySchema(tableId, "pk", Schema.INT32_SCHEMA));
assertTableSchema("public.numeric_table", "si, i, bi, d, n, r, db, ss, bs, b",
Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA,
Decimal.builder(2).optional().build(), Decimal.builder(4).optional().build(), Schema.OPTIONAL_FLOAT32_SCHEMA,
assertTableSchema("public.numeric_table", "si, i, bi, r, db, ss, bs, b",
Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_FLOAT32_SCHEMA,
Schema.OPTIONAL_FLOAT64_SCHEMA, Schema.INT16_SCHEMA, Schema.INT64_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA);
assertTableSchema("public.numeric_decimal_table", "d, dzs, dvs, n, nzs, nvs",
Decimal.builder(2).optional().build(),
Decimal.builder(0).optional().build(),
VariableScaleDecimal.builder().optional().build(),
Decimal.builder(4).optional().build(),
Decimal.builder(0).optional().build(),
VariableScaleDecimal.builder().optional().build()
);
assertTableSchema("public.string_table", "vc, vcv, ch, c, t",
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);

View File

@ -16,7 +16,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
@ -68,6 +67,10 @@ public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Except
//numerical types
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType());
//numerical decimal types
consumer.expects(1);
assertInsert(INSERT_NUMERIC_DECIMAL_TYPES_STMT, schemasAndValuesForNumericDecimalType());
// string types
consumer.expects(1);
assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypes());
@ -319,11 +322,7 @@ public void shouldReceiveNumericTypeAsDouble() throws Exception {
consumer = testConsumer(1);
recordsProducer.start(consumer);
List<SchemaAndValueField> schemasAndValuesForNumericType = schemasAndValuesForNumericType();
schemasAndValuesForNumericType.set(3, new SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, 1.1d));
schemasAndValuesForNumericType.set(4, new SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.22d));
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType);
assertInsert(INSERT_NUMERIC_DECIMAL_TYPES_STMT, schemasAndValuesForImpreciseNumericDecimalType());
}
private void assertInsert(String statement, List<SchemaAndValueField> expectedSchemaAndValuesByColumn) {

View File

@ -3,7 +3,9 @@
-- Generate a number of tables to cover as many of the PG types as possible
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;
CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT, d DECIMAL(3,2), n NUMERIC(6,4), r REAL, db DOUBLE PRECISION, ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk));
CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT, r REAL, db DOUBLE PRECISION, ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk));
-- no suffix -fixed scale, zs - zero scale, vs - variable scale
CREATE TABLE numeric_decimal_table (pk SERIAL, d DECIMAL(3,2), dzs DECIMAL(4), dvs DECIMAL, n NUMERIC(6,4), nzs NUMERIC(4), nvs NUMERIC, PRIMARY KEY(pk));
CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, PRIMARY KEY(pk));
CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk));
CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk));

View File

@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.data;
import java.math.BigDecimal;
import java.math.BigInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
/**
* An arbitrary precision decimal value with variable scale.
*
* @author Jiri Pechanec
*
*/
public class VariableScaleDecimal {
public static final String LOGICAL_NAME = "io.debezium.data.VariableScaleDecimal";
public static final String SCALE_FIELD = "scale";
public static final String VALUE_FIELD = "value";
/**
* Returns a {@link SchemaBuilder} for a VariableScaleDecimal. You can use the resulting SchemaBuilder
* to set additional schema settings such as required/optional, default value, and documentation.
*
* @return the schema builder
*/
public static SchemaBuilder builder() {
return SchemaBuilder.struct()
.name(LOGICAL_NAME)
.version(1)
.doc("Variable scaled decimal")
.field(SCALE_FIELD, Schema.INT32_SCHEMA)
.field(VALUE_FIELD, Schema.BYTES_SCHEMA);
}
/**
* Returns a Schema for a VariableScaleDecimal but with all other default Schema settings.
*
* @return the schema
* @see #builder()
*/
public static Schema schema() {
return builder().build();
}
/**
* Converts a value from its logical format (BigDecimal) to it's encoded format - a struct containing
* the scale of the number and a binary representation of the number
*
* @param value the logical value
* @return the encoded value
*/
public static Struct fromLogical(final Schema schema, final BigDecimal value) {
Struct result = new Struct(schema);
return result.put(SCALE_FIELD, value.scale()).put(VALUE_FIELD, value.unscaledValue().toByteArray());
}
/**
* Decodes the encoded value - see {@link #fromLogical(Schema, BigDecimal)} for encoding format
*
* @param value the encoded value
* @return the decoded value
*/
public static BigDecimal toLogical(final Struct value) {
return new BigDecimal(new BigInteger((byte[])value.getBytes(VALUE_FIELD)), value.getInt32(SCALE_FIELD));
}
}