DBZ-1931 Reflect decimal.handling.mode in money conversion

This commit is contained in:
Vojtech Juranek 2021-11-14 21:13:08 +01:00 committed by Gunnar Morling
parent 593359e3f8
commit 6f286fe50c
3 changed files with 130 additions and 19 deletions

View File

@ -221,8 +221,7 @@ public SchemaBuilder schemaBuilder(Column column) {
case PgOid.POINT: case PgOid.POINT:
return Point.builder(); return Point.builder();
case PgOid.MONEY: case PgOid.MONEY:
// Money has always scale 2 return moneySchema();
return Decimal.builder(2);
case PgOid.NUMERIC: case PgOid.NUMERIC:
return numericSchema(column); return numericSchema(column);
case PgOid.BYTEA: case PgOid.BYTEA:
@ -367,6 +366,20 @@ private SchemaBuilder hstoreSchema() {
} }
} }
private SchemaBuilder moneySchema() {
switch (decimalMode) {
case DOUBLE:
return SchemaBuilder.float64();
case PRECISE:
// Money has always scale 2
return Decimal.builder(2);
case STRING:
return SchemaBuilder.string();
default:
throw new IllegalArgumentException("Unknown decimalMode");
}
}
@Override @Override
public ValueConverter converter(Column column, Field fieldDefn) { public ValueConverter converter(Column column, Field fieldDefn) {
int oidValue = column.nativeType(); int oidValue = column.nativeType();
@ -403,7 +416,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case PgOid.POINT: case PgOid.POINT:
return data -> convertPoint(column, fieldDefn, data); return data -> convertPoint(column, fieldDefn, data);
case PgOid.MONEY: case PgOid.MONEY:
return data -> convertMoney(column, fieldDefn, data); return data -> convertMoney(column, fieldDefn, data, decimalMode);
case PgOid.NUMERIC: case PgOid.NUMERIC:
return (data) -> convertDecimal(column, fieldDefn, data, decimalMode); return (data) -> convertDecimal(column, fieldDefn, data, decimalMode);
case PgOid.BYTEA: case PgOid.BYTEA:
@ -722,14 +735,31 @@ protected Object convertBits(Column column, Field fieldDefn, Object data, int nu
return super.convertBits(column, fieldDefn, data, numBytes); return super.convertBits(column, fieldDefn, data, numBytes);
} }
protected Object convertMoney(Column column, Field fieldDefn, Object data) { protected Object convertMoney(Column column, Field fieldDefn, Object data, DecimalMode mode) {
return convertValue(column, fieldDefn, data, BigDecimal.ZERO.setScale(2), (r) -> { return convertValue(column, fieldDefn, data, BigDecimal.ZERO.setScale(2), (r) -> {
if (data instanceof Double) { switch (mode) {
r.deliver(BigDecimal.valueOf((Double) data).setScale(2)); case DOUBLE:
} if (data instanceof Double) {
else if (data instanceof Number) { r.deliver(data);
// the plugin will return a 64bit signed integer where the last 2 are always decimals }
r.deliver(BigDecimal.valueOf(((Number) data).longValue(), 2)); else if (data instanceof Number) {
r.deliver(((Number) data).doubleValue());
}
break;
case PRECISE:
if (data instanceof Double) {
r.deliver(BigDecimal.valueOf((Double) data).setScale(2));
}
else if (data instanceof Number) {
// the plugin will return a 64bit signed integer where the last 2 are always decimals
r.deliver(BigDecimal.valueOf(((Number) data).longValue(), 2));
}
break;
case STRING:
r.deliver(String.valueOf(data));
break;
default:
throw new IllegalArgumentException("Unknown decimalMode");
} }
}); });
} }

View File

@ -13,7 +13,6 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.math.BigDecimal;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -25,7 +24,6 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -623,16 +621,16 @@ public void shouldSnapshotDomainTypeWithPropagatedSourceTypeAttributes() throws
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
List<SchemaAndValueField> expected = Arrays.asList( List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField("salary", Decimal.builder(2).optional() new SchemaAndValueField("salary", SchemaBuilder.float64().optional()
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY") .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE))
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")
.build(), BigDecimal.valueOf(7.25)), .build(), 7.25),
new SchemaAndValueField("salary2", Decimal.builder(2).optional() new SchemaAndValueField("salary2", SchemaBuilder.float64().optional()
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY2") .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY2")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE))
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")
.build(), BigDecimal.valueOf(8.25)), .build(), 8.25),
new SchemaAndValueField("a", SchemaBuilder.float64().optional() new SchemaAndValueField("a", SchemaBuilder.float64().optional()
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERIC") .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERIC")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8")

View File

@ -2081,7 +2081,7 @@ public void shouldStreamChangesForDataTypeAlias() throws Exception {
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, salary2 money2, PRIMARY KEY(pk));"); TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, salary2 money2, PRIMARY KEY(pk));");
startConnector(config -> config startConnector(config -> config
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE)
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table"), .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table"),
@ -2134,8 +2134,8 @@ public void shouldStreamChangesForDomainAliasAlterTable() throws Exception {
List<SchemaAndValueField> expected = Arrays.asList( List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1), new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1),
new SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"), new SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"),
new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25)), new SchemaAndValueField("salary", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA, 7.25),
new SchemaAndValueField("salary2", Decimal.builder(2).build(), new BigDecimal(8.25)), new SchemaAndValueField("salary2", SchemaBuilder.FLOAT64_SCHEMA, 8.25),
new SchemaAndValueField("salary3", SchemaBuilder.float64() new SchemaAndValueField("salary3", SchemaBuilder.float64()
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY3") .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY3")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8")
@ -2807,6 +2807,89 @@ record = consumer.remove();
} }
} }
@Test
@FixFor("DBZ-1931")
public void testStreamMoneyAsDefaultPrecise() throws Exception {
TestHelper.execute("CREATE TABLE salary (pk SERIAL, name VARCHAR(50), salary money, PRIMARY KEY(pk));");
startConnector(config -> config
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.salary"),
false);
waitForStreamingToStart();
consumer = testConsumer(1);
executeAndWait("INSERT INTO salary (name, salary) values ('Joe', 123.45);");
SourceRecord rec = assertRecordInserted("public.salary", PK_FIELD, 1);
assertSourceInfo(rec, "postgres", "public", "salary");
List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1),
new SchemaAndValueField("name", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "Joe"),
new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), BigDecimal.valueOf(123.45)));
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
assertThat(consumer.isEmpty()).isTrue();
}
@Test
@FixFor("DBZ-1931")
public void testStreamMoneyAsString() throws Exception {
TestHelper.execute("CREATE TABLE salary (pk SERIAL, name VARCHAR(50), salary money, PRIMARY KEY(pk));");
startConnector(config -> config
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.salary"),
false);
waitForStreamingToStart();
consumer = testConsumer(1);
executeAndWait("INSERT INTO salary (name, salary) values ('Joe', 123.45);");
SourceRecord rec = assertRecordInserted("public.salary", PK_FIELD, 1);
assertSourceInfo(rec, "postgres", "public", "salary");
List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1),
new SchemaAndValueField("name", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "Joe"),
new SchemaAndValueField("salary", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "123.45"));
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
assertThat(consumer.isEmpty()).isTrue();
}
@Test
@FixFor("DBZ-1931")
public void testStreamMoneyAsDouble() throws Exception {
TestHelper.execute("CREATE TABLE salary (pk SERIAL, name VARCHAR(50), salary money, PRIMARY KEY(pk));");
startConnector(config -> config
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.salary"),
false);
waitForStreamingToStart();
consumer = testConsumer(1);
executeAndWait("INSERT INTO salary (name, salary) values ('Joe', 123.45);");
SourceRecord rec = assertRecordInserted("public.salary", PK_FIELD, 1);
assertSourceInfo(rec, "postgres", "public", "salary");
List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1),
new SchemaAndValueField("name", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "Joe"),
new SchemaAndValueField("salary", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA, 123.45));
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
assertThat(consumer.isEmpty()).isTrue();
}
private void assertHeartBeatRecordInserted() { private void assertHeartBeatRecordInserted() {
assertFalse("records not generated", consumer.isEmpty()); assertFalse("records not generated", consumer.isEmpty());