diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 02a606746..60c9b42a4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -6,10 +6,12 @@ package io.debezium.connector.postgresql; +import java.math.BigDecimal; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -86,6 +88,72 @@ public static TemporalPrecisionMode parse(String value, String defaultValue) { } } + /** + * The set of predefined DecimalHandlingMode options or aliases. + */ + public static enum DecimalHandlingMode implements EnumeratedValue { + /** + * Represent {@code DECIMAL} and {@code NUMERIC} values as precise {@link BigDecimal} values, which are + * represented in change events in a binary form. This is precise but difficult to use. + */ + PRECISE("precise"), + + /** + * Represent {@code DECIMAL} and {@code NUMERIC} values as precise {@code double} values. This may be less precise + * but is far easier to use. + */ + DOUBLE("double"); + + private final String value; + + private DecimalHandlingMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + public DecimalMode asDecimalMode() { + switch (this) { + case DOUBLE: + return DecimalMode.DOUBLE; + case PRECISE: + default: + return DecimalMode.PRECISE; + } + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static DecimalHandlingMode parse(String value) { + if (value == null) return null; + value = value.trim(); + for (DecimalHandlingMode option : DecimalHandlingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) return option; + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static DecimalHandlingMode parse(String value, String defaultValue) { + DecimalHandlingMode mode = parse(value); + if (mode == null && defaultValue != null) mode = parse(defaultValue); + return mode; + } + } + /** * The set of predefined SnapshotMode options or aliases. */ @@ -552,6 +620,15 @@ public static TopicSelectionStrategy parse(String value) { + "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, " + "which uses millisecond precision regardless of the database columns' precision ."); + public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode") + .withDisplayName("Decimal Handling") + .withEnum(DecimalHandlingMode.class, DecimalHandlingMode.PRECISE) + .withWidth(Width.SHORT) + .withImportance(Importance.MEDIUM) + .withDescription("Specify how DECIMAL and NUMERIC columns should be represented in change events, including:" + + "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; " + + "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers."); + public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms") .withDisplayName("Status update interval (ms)") .withType(Type.INT) // Postgres doesn't accept long for this value @@ -577,7 +654,7 @@ public static TopicSelectionStrategy parse(String value) { MAX_QUEUE_SIZE, POLL_INTERVAL_MS, SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST, COLUMN_BLACKLIST, SNAPSHOT_MODE, - TIME_PRECISION_MODE, + TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, ROWS_FETCH_SIZE, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE); @@ -585,6 +662,7 @@ public static TopicSelectionStrategy parse(String value) { private final Configuration config; private final String serverName; private final boolean adaptiveTimePrecision; + private final DecimalMode decimalHandlingMode; private final SnapshotMode snapshotMode; protected PostgresConnectorConfig(Configuration config) { @@ -596,6 +674,9 @@ protected PostgresConnectorConfig(Configuration config) { this.serverName = serverName; TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE)); this.adaptiveTimePrecision = TemporalPrecisionMode.ADAPTIVE == timePrecisionMode; + String decimalHandlingModeStr = config.getString(PostgresConnectorConfig.DECIMAL_HANDLING_MODE); + DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr); + this.decimalHandlingMode = decimalHandlingMode.asDecimalMode(); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); } @@ -643,6 +724,10 @@ protected boolean adaptiveTimePrecision() { return adaptiveTimePrecision; } + protected DecimalMode decimalHandlingMode() { + return decimalHandlingMode; + } + protected Configuration jdbcConfig() { return config.subset(DATABASE_CONFIG_PREFIX, true); } @@ -713,7 +798,7 @@ protected static ConfigDef configDef() { Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST, COLUMN_BLACKLIST); Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, POLL_INTERVAL_MS, MAX_BATCH_SIZE, MAX_QUEUE_SIZE, - SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, ROWS_FETCH_SIZE); + SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE); return config; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java index e1c03ff4d..ed7e39bb1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java @@ -61,7 +61,8 @@ protected PostgresSchema(PostgresConnectorConfig config) { this.filters = new Filters(config); this.tables = new Tables(); - PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC); + PostgresValueConverter valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.adaptiveTimePrecision(), + ZoneOffset.UTC); this.schemaNameValidator = AvroValidator.create(LOGGER)::validate; this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 0a75b2bfd..2b37f63e8 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -54,8 +54,8 @@ public class PostgresValueConverter extends JdbcValueConverters { */ protected static final double DAYS_PER_MONTH_AVG = 365.25 / 12.0d; - protected PostgresValueConverter(boolean adaptiveTimePrecision, ZoneOffset defaultOffset) { - super(DecimalMode.PRECISE, adaptiveTimePrecision, defaultOffset, null); + protected PostgresValueConverter(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset) { + super(decimalMode, adaptiveTimePrecision, defaultOffset, null); } @Override @@ -167,7 +167,12 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.MONEY: return data -> convertMoney(column, fieldDefn, data); case PgOid.NUMERIC: - return data -> convertDecimal(column, fieldDefn, data); + switch (decimalMode) { + case DOUBLE: + return (data) -> convertDouble(column, fieldDefn, data); + case PRECISE: + return (data) -> convertDecimal(column, fieldDefn, data); + } case PgOid.INT2_ARRAY: case PgOid.INT4_ARRAY: case PgOid.INT8_ARRAY: diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index ceaf4b870..94d98eb8a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -145,6 +145,7 @@ public void shouldValidateConfiguration() throws Exception { validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL); validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, PostgresConnectorConfig.DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS); validateField(validatedConfig, PostgresConnectorConfig.TIME_PRECISION_MODE, PostgresConnectorConfig.TemporalPrecisionMode.ADAPTIVE); + validateField(validatedConfig, PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.PRECISE); validateField(validatedConfig, PostgresConnectorConfig.SSL_SOCKET_FACTORY, null); validateField(validatedConfig, PostgresConnectorConfig.TCP_KEEPALIVE, null); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 067ced160..dfeea5087 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; @@ -305,6 +306,26 @@ record = consumer.remove(); VerifyRecord.isValidTombstone(record, PK_FIELD, 2); } + @Test + public void shouldReceiveNumericTypeAsDouble() throws Exception { + PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.DOUBLE) + .build()); + PostgresTaskContext context = new PostgresTaskContext(config, new PostgresSchema(config)); + recordsProducer = new RecordsStreamProducer(context, new SourceInfo(config.serverName())); + + TestHelper.executeDDL("postgres_create_tables.ddl"); + + consumer = testConsumer(1); + recordsProducer.start(consumer); + + List schemasAndValuesForNumericType = schemasAndValuesForNumericType(); + schemasAndValuesForNumericType.set(3, new SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, 1.1d)); + schemasAndValuesForNumericType.set(4, new SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.22d)); + + assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType); + } + private void assertInsert(String statement, List expectedSchemaAndValuesByColumn) { TableId table = tableIdFromInsertStmt(statement); String expectedTopicName = table.schema() + "." + table.table();