DBZ-7920 Add support for timestamp infinity values for MySQL

This commit is contained in:
mfvitale 2024-06-26 14:47:55 +02:00 committed by Fiore Mario Vitale
parent acabb806d3
commit 370badd9e7
10 changed files with 171 additions and 58 deletions

View File

@ -360,6 +360,20 @@ default String getTimeQueryBinding() {
*/
String getTypeName(int jdbcType, Size size);
/**
* +Infinity value for a timestamp.
*
* @return the +infinity representation for timestamp.
*/
String getTimestampPositiveInfinityValue();
/**
* -Infinity value for a timestamp.
*
* @return the -infinity representation for timestamp.
*/
String getTimestampNegativeInfinityValue();
/**
* Bind the specified value to the query.
*

View File

@ -10,6 +10,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.sql.Types;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@ -526,6 +528,16 @@ public String getTypeName(int jdbcType, Size size) {
return ddlTypeRegistry.getTypeName(jdbcType, size);
}
@Override
public String getTimestampPositiveInfinityValue() {
return Timestamp.from(Instant.MAX).toString();
}
@Override
public String getTimestampNegativeInfinityValue() {
return Timestamp.from(Instant.MIN).toString();
}
@Override
public String getByteArrayFormat() {
return "x'%s'";

View File

@ -141,6 +141,16 @@ public String getFormattedTimestampWithTimeZone(String value) {
return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime));
}
@Override
public String getTimestampPositiveInfinityValue() {
return "2038-01-19T03:14:07+00:00";
}
@Override
public String getTimestampNegativeInfinityValue() {
return "1970-01-01T00:00:01+00:00";
}
@Override
public String getAlterTablePrefix() {
return "ADD COLUMN (";

View File

@ -6,13 +6,10 @@
package io.debezium.connector.jdbc.dialect.mysql;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;
@ -26,19 +23,13 @@ public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {
return getDialect().getQueryBindingWithValueCast(column, schema, this);
}
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), Types.TIMESTAMP)); // TIMESTAMP_WITH_TIMEZONE not supported
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
@Override
protected int getJdbcBindType() {
return Types.TIMESTAMP; // TIMESTAMP_WITH_TIMEZONE not supported
}
}

View File

@ -5,6 +5,9 @@
*/
package io.debezium.connector.jdbc.dialect.postgres;
import static io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.NEGATIVE_INFINITY;
import static io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.POSITIVE_INFINITY;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
@ -212,4 +215,14 @@ protected String resolveColumnNameFromField(String fieldName) {
}
return columnName;
}
@Override
public String getTimestampPositiveInfinityValue() {
return POSITIVE_INFINITY;
}
@Override
public String getTimestampNegativeInfinityValue() {
return NEGATIVE_INFINITY;
}
}

View File

@ -6,14 +6,9 @@
package io.debezium.connector.jdbc.dialect.postgres;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;
@ -25,43 +20,15 @@
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
public static final List<String> POSITIVE_INFINITY = List.of("infinity", "+infinity");
public static final String NEGATIVE_INFINITY = "-infinity";
@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
if (POSITIVE_INFINITY.contains(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
if (POSITIVE_INFINITY.equals(value)) {
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY, Types.VARCHAR));
}
return super.getQueryBinding(column, schema, value);
}
@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
else {
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
}
if (value instanceof String) {
final ZonedDateTime zdt;
if (POSITIVE_INFINITY.contains(value)) {
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY.get(0), Types.VARCHAR));
}
if (NEGATIVE_INFINITY.equals(value)) {
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
}
zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType()));
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
}
}

View File

@ -14,6 +14,7 @@
import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;
@ -26,6 +27,8 @@
public class ZonedTimestampType extends AbstractTimestampType {
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
public static final String POSITIVE_INFINITY = "infinity";
public static final String NEGATIVE_INFINITY = "-infinity";
@Override
public String[] getRegistrationKeys() {
@ -37,6 +40,16 @@ public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Obj
return dialect.getFormattedTimestampWithTimeZone((String) value);
}
@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
}
return super.getQueryBinding(column, schema, value);
}
@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
@ -45,15 +58,44 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
}
if (value instanceof String) {
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return infinityTimestampValue(index, value);
}
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType()));
return normalTimestampValue(index, value);
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
}
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
final ZonedDateTime zdt;
if (POSITIVE_INFINITY.equals(value)) {
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
}
else {
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
}
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType()));
}
protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {
final ZonedDateTime zdt;
zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType()));
}
protected int getJdbcBindType() {
return getJdbcType();
}
@Override
protected int getJdbcType() {
return Types.TIMESTAMP_WITH_TIMEZONE;

View File

@ -2568,6 +2568,35 @@ else if (sink.getType().is(SinkType.MYSQL)) {
ResultSet::getString);
}
@TestTemplate
@ForSource(value = { SourceType.POSTGRES }, reason = "The infinity value is valid only for PostgreSQL")
@WithTemporalPrecisionMode
public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception {
final List<String> values = List.of("'-infinity'", "'infinity'");
List<ZonedDateTime> expectedValues = List.of();
if (sink.getType().is(SinkType.POSTGRES)) {
// expectedValues = values;
}
else if (sink.getType().is(SinkType.MYSQL)) {
expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
}
assertDataTypesNonKeyOnly(source,
sink,
List.of("timestamptz", "timestamptz"),
values,
expectedValues,
(record) -> {
assertColumn(sink, record, "data0", getTimestampWithTimezoneType(source, false, 6));
assertColumn(sink, record, "data1", getTimestampWithTimezoneType(source, false, 6));
},
(rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC));
}
// todo: remaining data types need tests and/or type system mapping support
// GEOMETRY (MySql/PostgreSQL)
// LINESTRING (MySQL)

View File

@ -5,13 +5,24 @@
*/
package io.debezium.connector.jdbc.e2e;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkType;
import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType;
import java.sql.ResultSet;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* Implementation of the JDBC sink connector multi-source pipeline that writes to PostgreSQL.
*
@ -184,4 +195,28 @@ protected String getTimestampWithTimezoneType(Source source, boolean key, int pr
protected String getIntervalType(Source source, boolean numeric) {
return "INTERVAL";
}
@TestTemplate
@ForSource(value = { SourceType.POSTGRES }, reason = "The infinity value is valid only for PostgreSQL")
@WithTemporalPrecisionMode
@Override
public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception {
final List<String> values = List.of("'-infinity'", "'infinity'");
List<String> expectedValues = values.stream()
.map(s -> s.replace("'", ""))
.collect(Collectors.toList());
assertDataTypesNonKeyOnly(source,
sink,
List.of("timestamptz", "timestamptz"),
values,
expectedValues,
(record) -> {
assertColumn(sink, record, "data0", getTimestampWithTimezoneType(source, false, 6));
assertColumn(sink, record, "data1", getTimestampWithTimezoneType(source, false, 6));
},
ResultSet::getString);
}
}

View File

@ -220,7 +220,7 @@ public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws
final SinkRecord createInfinityRecord = factory.createRecordWithSchemaValue(topicName, (byte) 1,
List.of("timestamp_infinity-", "timestamp_infinity+", "range_with_infinity"),
List.of(zonedTimestampSchema, zonedTimestampSchema, rangeSchema),
Arrays.asList(new Object[]{ "-infinity", "+infinity", "[2010-01-01 14:30, +infinity)" }));
Arrays.asList(new Object[]{ "-infinity", "infinity", "[2010-01-01 14:30, infinity)" }));
consume(createInfinityRecord);
final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createInfinityRecord));
@ -230,7 +230,7 @@ public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws
getSink().assertColumnType(tableAssert, "timestamp_infinity-", Timestamp.class, new Timestamp(PGStatement.DATE_NEGATIVE_INFINITY));
getSink().assertColumnType(tableAssert, "timestamp_infinity+", Timestamp.class, new Timestamp(PGStatement.DATE_POSITIVE_INFINITY));
getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, +infinity)");
getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, infinity)");
}