diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index acd8dcaee..8acb9e971 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -221,8 +221,7 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.POINT: return Point.builder(); case PgOid.MONEY: - // Money has always scale 2 - return Decimal.builder(2); + return moneySchema(); case PgOid.NUMERIC: return numericSchema(column); case PgOid.BYTEA: @@ -367,6 +366,20 @@ private SchemaBuilder hstoreSchema() { } } + private SchemaBuilder moneySchema() { + switch (decimalMode) { + case DOUBLE: + return SchemaBuilder.float64(); + case PRECISE: + // Money has always scale 2 + return Decimal.builder(2); + case STRING: + return SchemaBuilder.string(); + default: + throw new IllegalArgumentException("Unknown decimalMode"); + } + } + @Override public ValueConverter converter(Column column, Field fieldDefn) { int oidValue = column.nativeType(); @@ -403,7 +416,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.POINT: return data -> convertPoint(column, fieldDefn, data); case PgOid.MONEY: - return data -> convertMoney(column, fieldDefn, data); + return data -> convertMoney(column, fieldDefn, data, decimalMode); case PgOid.NUMERIC: return (data) -> convertDecimal(column, fieldDefn, data, decimalMode); case PgOid.BYTEA: @@ -722,14 +735,31 @@ protected Object convertBits(Column column, Field fieldDefn, Object data, int nu return super.convertBits(column, fieldDefn, data, numBytes); } - protected Object convertMoney(Column column, Field fieldDefn, Object data) { + protected Object convertMoney(Column column, Field fieldDefn, Object data, DecimalMode mode) { return convertValue(column, fieldDefn, data, BigDecimal.ZERO.setScale(2), (r) -> { - if (data instanceof Double) { - r.deliver(BigDecimal.valueOf((Double) data).setScale(2)); - } - else if (data instanceof Number) { - // the plugin will return a 64bit signed integer where the last 2 are always decimals - r.deliver(BigDecimal.valueOf(((Number) data).longValue(), 2)); + switch (mode) { + case DOUBLE: + if (data instanceof Double) { + r.deliver(data); + } + else if (data instanceof Number) { + r.deliver(((Number) data).doubleValue()); + } + break; + case PRECISE: + if (data instanceof Double) { + r.deliver(BigDecimal.valueOf((Double) data).setScale(2)); + } + else if (data instanceof Number) { + // the plugin will return a 64bit signed integer where the last 2 are always decimals + r.deliver(BigDecimal.valueOf(((Number) data).longValue(), 2)); + } + break; + case STRING: + r.deliver(String.valueOf(data)); + break; + default: + throw new IllegalArgumentException("Unknown decimalMode"); } }); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 92fec3b9c..d2d3c6a66 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -13,7 +13,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -25,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -623,16 +621,16 @@ public void shouldSnapshotDomainTypeWithPropagatedSourceTypeAttributes() throws consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); List expected = Arrays.asList( - new SchemaAndValueField("salary", Decimal.builder(2).optional() + new SchemaAndValueField("salary", SchemaBuilder.float64().optional() .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") - .build(), BigDecimal.valueOf(7.25)), - new SchemaAndValueField("salary2", Decimal.builder(2).optional() + .build(), 7.25), + new SchemaAndValueField("salary2", SchemaBuilder.float64().optional() .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY2") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") - .build(), BigDecimal.valueOf(8.25)), + .build(), 8.25), new SchemaAndValueField("a", SchemaBuilder.float64().optional() .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERIC") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index da9e89e15..0aabd8604 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -2081,7 +2081,7 @@ public void shouldStreamChangesForDataTypeAlias() throws Exception { TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, salary2 money2, PRIMARY KEY(pk));"); startConnector(config -> config - .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table"), @@ -2134,8 +2134,8 @@ public void shouldStreamChangesForDomainAliasAlterTable() throws Exception { List expected = Arrays.asList( new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1), new SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"), - new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25)), - new SchemaAndValueField("salary2", Decimal.builder(2).build(), new BigDecimal(8.25)), + new SchemaAndValueField("salary", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA, 7.25), + new SchemaAndValueField("salary2", SchemaBuilder.FLOAT64_SCHEMA, 8.25), new SchemaAndValueField("salary3", SchemaBuilder.float64() .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY3") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") @@ -2807,6 +2807,89 @@ record = consumer.remove(); } } + @Test + @FixFor("DBZ-1931") + public void testStreamMoneyAsDefaultPrecise() throws Exception { + TestHelper.execute("CREATE TABLE salary (pk SERIAL, name VARCHAR(50), salary money, PRIMARY KEY(pk));"); + + startConnector(config -> config + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.salary"), + false); + + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO salary (name, salary) values ('Joe', 123.45);"); + + SourceRecord rec = assertRecordInserted("public.salary", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "salary"); + + List expected = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1), + new SchemaAndValueField("name", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "Joe"), + new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), BigDecimal.valueOf(123.45))); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-1931") + public void testStreamMoneyAsString() throws Exception { + TestHelper.execute("CREATE TABLE salary (pk SERIAL, name VARCHAR(50), salary money, PRIMARY KEY(pk));"); + + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.salary"), + false); + + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO salary (name, salary) values ('Joe', 123.45);"); + + SourceRecord rec = assertRecordInserted("public.salary", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "salary"); + + List expected = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1), + new SchemaAndValueField("name", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "Joe"), + new SchemaAndValueField("salary", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "123.45")); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-1931") + public void testStreamMoneyAsDouble() throws Exception { + TestHelper.execute("CREATE TABLE salary (pk SERIAL, name VARCHAR(50), salary money, PRIMARY KEY(pk));"); + + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.salary"), + false); + + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO salary (name, salary) values ('Joe', 123.45);"); + + SourceRecord rec = assertRecordInserted("public.salary", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "salary"); + + List expected = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).build(), 1), + new SchemaAndValueField("name", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "Joe"), + new SchemaAndValueField("salary", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA, 123.45)); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + private void assertHeartBeatRecordInserted() { assertFalse("records not generated", consumer.isEmpty());