DBZ-7920 Add support for 'infinity' values for PostgreSQL
This commit is contained in:
parent
39c7591cdb
commit
2b9ad53b1a
@ -165,6 +165,7 @@ protected void registerTypes() {
|
||||
super.registerTypes();
|
||||
|
||||
registerType(TimeWithTimezoneType.INSTANCE);
|
||||
registerType(ZonedTimestampType.INSTANCE);
|
||||
registerType(IntervalType.INSTANCE);
|
||||
registerType(SerialType.INSTANCE);
|
||||
registerType(BitType.INSTANCE);
|
||||
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.jdbc.dialect.postgres;
|
||||
|
||||
import java.sql.Types;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import io.debezium.connector.jdbc.ValueBindDescriptor;
|
||||
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
|
||||
import io.debezium.connector.jdbc.type.Type;
|
||||
import io.debezium.time.ZonedTimestamp;
|
||||
|
||||
/**
|
||||
* An implementation of {@link Type} for {@link ZonedTimestamp} values specific to PostgreSQL.
|
||||
*
|
||||
* @author Mario Fiore Vitale
|
||||
*/
|
||||
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {
|
||||
|
||||
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
|
||||
public static final List<String> POSITIVE_INFINITY = List.of("infinity", "+infinity");
|
||||
public static final String NEGATIVE_INFINITY = "-infinity";
|
||||
|
||||
@Override
|
||||
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
|
||||
|
||||
if (POSITIVE_INFINITY.contains(value) || NEGATIVE_INFINITY.equals(value)) {
|
||||
return "cast(? as timestamptz)";
|
||||
}
|
||||
|
||||
return super.getQueryBinding(column, schema, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
|
||||
|
||||
if (value == null) {
|
||||
return List.of(new ValueBindDescriptor(index, null));
|
||||
}
|
||||
if (value instanceof String) {
|
||||
|
||||
final ZonedDateTime zdt;
|
||||
|
||||
if (POSITIVE_INFINITY.contains(value)) {
|
||||
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY.get(0), Types.VARCHAR));
|
||||
}
|
||||
|
||||
if (NEGATIVE_INFINITY.equals(value)) {
|
||||
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
|
||||
}
|
||||
|
||||
zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
|
||||
|
||||
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType()));
|
||||
}
|
||||
|
||||
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
|
||||
value, value.getClass().getName()));
|
||||
}
|
||||
}
|
@ -6,6 +6,8 @@
|
||||
package io.debezium.connector.jdbc.integration.postgres;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
@ -192,6 +194,46 @@ public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithoutQuot
|
||||
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
|
||||
@FixFor("DBZ-7920")
|
||||
public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws SQLException {
|
||||
|
||||
final Map<String, String> properties = getDefaultSinkConfig();
|
||||
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
|
||||
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_VALUE.getValue());
|
||||
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS, "id");
|
||||
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue());
|
||||
|
||||
startSinkConnector(properties);
|
||||
assertSinkConnectorIsRunning();
|
||||
|
||||
final String tableName = randomTableName();
|
||||
final String topicName = topicName("server1", "schema", tableName);
|
||||
|
||||
Schema zonedTimestampSchema = SchemaBuilder.string()
|
||||
.name("io.debezium.time.ZonedTimestamp")
|
||||
.build();
|
||||
|
||||
Schema rangeSchema = SchemaBuilder.string().build();
|
||||
|
||||
final SinkRecord createInfinityRecord = factory.createRecordWithSchemaValue(topicName, (byte) 1,
|
||||
List.of("timestamp_infinity-", "timestamp_infinity+", "range_with_infinity"),
|
||||
List.of(zonedTimestampSchema, zonedTimestampSchema, rangeSchema),
|
||||
Arrays.asList(new Object[]{ "-infinity", "+infinity", "[2010-01-01 14:30, +infinity)" }));
|
||||
consume(createInfinityRecord);
|
||||
|
||||
final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createInfinityRecord));
|
||||
tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(4);
|
||||
|
||||
getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
|
||||
|
||||
getSink().assertColumnType(tableAssert, "timestamp_infinity-", Timestamp.class, Timestamp.valueOf(LocalDateTime.of(292269055, 12, 3, 0, 0, 0)));
|
||||
getSink().assertColumnType(tableAssert, "timestamp_infinity+", Timestamp.class, Timestamp.valueOf(LocalDateTime.of(292278994, 8, 17, 0, 0, 0)));
|
||||
getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, +infinity)");
|
||||
|
||||
}
|
||||
|
||||
private static Schema buildGeoTypeSchema(String type) {
|
||||
|
||||
SchemaBuilder schemaBuilder = SchemaBuilder.struct()
|
||||
|
Loading…
Reference in New Issue
Block a user