DBZ-337 Added ability to treat Postgres NUMERIC as double

This commit is contained in:
Steven Siahetiong 2017-06-20 08:09:01 +08:00 committed by Gunnar Morling
parent 9d2993f005
commit 6c34aea9d1
5 changed files with 119 additions and 6 deletions

View File

@ -6,10 +6,12 @@
package io.debezium.connector.postgresql;
import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -86,6 +88,72 @@ public static TemporalPrecisionMode parse(String value, String defaultValue) {
}
}
/**
* The set of predefined DecimalHandlingMode options or aliases.
*/
public static enum DecimalHandlingMode implements EnumeratedValue {
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise {@link BigDecimal} values, which are
* represented in change events in a binary form. This is precise but difficult to use.
*/
PRECISE("precise"),
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise {@code double} values. This may be less precise
* but is far easier to use.
*/
DOUBLE("double");
private final String value;
private DecimalHandlingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
public DecimalMode asDecimalMode() {
switch (this) {
case DOUBLE:
return DecimalMode.DOUBLE;
case PRECISE:
default:
return DecimalMode.PRECISE;
}
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DecimalHandlingMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (DecimalHandlingMode option : DecimalHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static DecimalHandlingMode parse(String value, String defaultValue) {
DecimalHandlingMode mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
/**
* The set of predefined SnapshotMode options or aliases.
*/
@ -552,6 +620,15 @@ public static TopicSelectionStrategy parse(String value) {
+ "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
+ "which uses millisecond precision regardless of the database columns' precision .");
public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode")
.withDisplayName("Decimal Handling")
.withEnum(DecimalHandlingMode.class, DecimalHandlingMode.PRECISE)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Specify how DECIMAL and NUMERIC columns should be represented in change events, including:"
+ "'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; "
+ "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers.");
public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms")
.withDisplayName("Status update interval (ms)")
.withType(Type.INT) // Postgres doesn't accept long for this value
@ -577,7 +654,7 @@ public static TopicSelectionStrategy parse(String value) {
MAX_QUEUE_SIZE, POLL_INTERVAL_MS, SCHEMA_WHITELIST,
SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE,
TIME_PRECISION_MODE,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD,
SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, ROWS_FETCH_SIZE, SSL_SOCKET_FACTORY,
STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
@ -585,6 +662,7 @@ public static TopicSelectionStrategy parse(String value) {
private final Configuration config;
private final String serverName;
private final boolean adaptiveTimePrecision;
private final DecimalMode decimalHandlingMode;
private final SnapshotMode snapshotMode;
protected PostgresConnectorConfig(Configuration config) {
@ -596,6 +674,9 @@ protected PostgresConnectorConfig(Configuration config) {
this.serverName = serverName;
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.adaptiveTimePrecision = TemporalPrecisionMode.ADAPTIVE == timePrecisionMode;
String decimalHandlingModeStr = config.getString(PostgresConnectorConfig.DECIMAL_HANDLING_MODE);
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
this.decimalHandlingMode = decimalHandlingMode.asDecimalMode();
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
}
@ -643,6 +724,10 @@ protected boolean adaptiveTimePrecision() {
return adaptiveTimePrecision;
}
protected DecimalMode decimalHandlingMode() {
return decimalHandlingMode;
}
protected Configuration jdbcConfig() {
return config.subset(DATABASE_CONFIG_PREFIX, true);
}
@ -713,7 +798,7 @@ protected static ConfigDef configDef() {
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST);
Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, POLL_INTERVAL_MS, MAX_BATCH_SIZE, MAX_QUEUE_SIZE,
SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, ROWS_FETCH_SIZE);
SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE);
return config;
}

View File

@ -61,7 +61,8 @@ protected PostgresSchema(PostgresConnectorConfig config) {
this.filters = new Filters(config);
this.tables = new Tables();
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC);
PostgresValueConverter valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.adaptiveTimePrecision(),
ZoneOffset.UTC);
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate;
this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator);

View File

@ -54,8 +54,8 @@ public class PostgresValueConverter extends JdbcValueConverters {
*/
protected static final double DAYS_PER_MONTH_AVG = 365.25 / 12.0d;
protected PostgresValueConverter(boolean adaptiveTimePrecision, ZoneOffset defaultOffset) {
super(DecimalMode.PRECISE, adaptiveTimePrecision, defaultOffset, null);
protected PostgresValueConverter(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset) {
super(decimalMode, adaptiveTimePrecision, defaultOffset, null);
}
@Override
@ -167,7 +167,12 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case PgOid.MONEY:
return data -> convertMoney(column, fieldDefn, data);
case PgOid.NUMERIC:
return data -> convertDecimal(column, fieldDefn, data);
switch (decimalMode) {
case DOUBLE:
return (data) -> convertDouble(column, fieldDefn, data);
case PRECISE:
return (data) -> convertDecimal(column, fieldDefn, data);
}
case PgOid.INT2_ARRAY:
case PgOid.INT4_ARRAY:
case PgOid.INT8_ARRAY:

View File

@ -145,6 +145,7 @@ public void shouldValidateConfiguration() throws Exception {
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL);
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, PostgresConnectorConfig.DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS);
validateField(validatedConfig, PostgresConnectorConfig.TIME_PRECISION_MODE, PostgresConnectorConfig.TemporalPrecisionMode.ADAPTIVE);
validateField(validatedConfig, PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.PRECISE);
validateField(validatedConfig, PostgresConnectorConfig.SSL_SOCKET_FACTORY, null);
validateField(validatedConfig, PostgresConnectorConfig.TCP_KEEPALIVE, null);
}

View File

@ -16,6 +16,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
@ -305,6 +306,26 @@ record = consumer.remove();
VerifyRecord.isValidTombstone(record, PK_FIELD, 2);
}
@Test
public void shouldReceiveNumericTypeAsDouble() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.DOUBLE)
.build());
PostgresTaskContext context = new PostgresTaskContext(config, new PostgresSchema(config));
recordsProducer = new RecordsStreamProducer(context, new SourceInfo(config.serverName()));
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer);
List<SchemaAndValueField> schemasAndValuesForNumericType = schemasAndValuesForNumericType();
schemasAndValuesForNumericType.set(3, new SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, 1.1d));
schemasAndValuesForNumericType.set(4, new SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, 22.22d));
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType);
}
private void assertInsert(String statement, List<SchemaAndValueField> expectedSchemaAndValuesByColumn) {
TableId table = tableIdFromInsertStmt(statement);
String expectedTopicName = table.schema() + "." + table.table();