DBZ-7920 Add support for timestamp infinity values for Oracle
This commit is contained in:
parent
a035a2e4bf
commit
4fd96e86af
@ -11,8 +11,8 @@
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.sql.Types;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.TemporalAccessor;
|
||||
|
@ -151,6 +151,16 @@ public String getFormattedTimestampWithTimeZone(String value) {
|
||||
return String.format(TO_TIMESTAMP_FF9_TZ, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTimestampPositiveInfinityValue() {
|
||||
return "9999-12-31T23:59:59+00:00";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTimestampNegativeInfinityValue() {
|
||||
return "-4712-01-01T00:00:00+00:00";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String resolveColumnNameFromField(String fieldName) {
|
||||
String columnName = super.resolveColumnNameFromField(fieldName);
|
||||
|
@ -5,16 +5,10 @@
|
||||
*/
|
||||
package io.debezium.connector.jdbc.dialect.oracle;
|
||||
|
||||
import java.sql.Types;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import io.debezium.connector.jdbc.ValueBindDescriptor;
|
||||
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
|
||||
import io.debezium.connector.jdbc.type.AbstractTimestampType;
|
||||
import io.debezium.connector.jdbc.type.Type;
|
||||
import io.debezium.time.ZonedTimestamp;
|
||||
|
||||
@ -23,40 +17,28 @@
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class ZonedTimestampType extends AbstractTimestampType {
|
||||
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {
|
||||
|
||||
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
|
||||
|
||||
@Override
|
||||
public String[] getRegistrationKeys() {
|
||||
return new String[]{ ZonedTimestamp.SCHEMA_NAME };
|
||||
}
|
||||
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
|
||||
final ZonedDateTime zdt;
|
||||
|
||||
@Override
|
||||
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
|
||||
return dialect.getFormattedTimestampWithTimeZone((String) value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
|
||||
|
||||
if (value == null) {
|
||||
return List.of(new ValueBindDescriptor(index, null));
|
||||
if (POSITIVE_INFINITY.equals(value)) {
|
||||
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER);
|
||||
}
|
||||
if (value instanceof String) {
|
||||
|
||||
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
|
||||
|
||||
return List.of(new ValueBindDescriptor(index, zdt, getJdbcType()));
|
||||
else {
|
||||
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER);
|
||||
}
|
||||
|
||||
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
|
||||
value, value.getClass().getName()));
|
||||
return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getJdbcType() {
|
||||
return Types.TIMESTAMP_WITH_TIMEZONE;
|
||||
}
|
||||
protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {
|
||||
|
||||
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
|
||||
|
||||
return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType()));
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.kafka.connect.sink.SinkRecord;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.TestTemplate;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
@ -2575,16 +2576,7 @@ public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Si
|
||||
|
||||
final List<String> values = List.of("'-infinity'", "'infinity'");
|
||||
|
||||
List<ZonedDateTime> expectedValues = List.of();
|
||||
if (sink.getType().is(SinkType.SQLSERVER)) {
|
||||
|
||||
expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
|
||||
}
|
||||
else if (sink.getType().is(SinkType.MYSQL)) {
|
||||
expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
|
||||
}
|
||||
List<ZonedDateTime> expectedValues = getExpectedZonedDateTimes(sink);
|
||||
|
||||
assertDataTypesNonKeyOnly(source,
|
||||
sink,
|
||||
@ -2598,6 +2590,30 @@ else if (sink.getType().is(SinkType.MYSQL)) {
|
||||
(rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC));
|
||||
}
|
||||
|
||||
private static @NotNull List<ZonedDateTime> getExpectedZonedDateTimes(Sink sink) {
|
||||
|
||||
List<ZonedDateTime> expectedValues = List.of();
|
||||
if (sink.getType().is(SinkType.SQLSERVER)) {
|
||||
|
||||
expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
|
||||
}
|
||||
else if (sink.getType().is(SinkType.MYSQL)) {
|
||||
|
||||
expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
|
||||
}
|
||||
else if (sink.getType().is(SinkType.ORACLE)) {
|
||||
|
||||
// The value read by the rs.getTimestamp() is correct but then the
|
||||
// rs.getTimestamp().toInstant() will return -4712-11-24. I suspect a bug somewhere in the time library.
|
||||
// The value on the DB is correct since select to_char(A , 'AD YYYY-MM-DD HH24:MI:SS') will return BC 4712-01-01 00:00:00
|
||||
expectedValues = List.of(ZonedDateTime.of(-4712, 11, 24, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
|
||||
}
|
||||
return expectedValues;
|
||||
}
|
||||
|
||||
// todo: remaining data types need tests and/or type system mapping support
|
||||
// GEOMETRY (MySql/PostgreSQL)
|
||||
// LINESTRING (MySQL)
|
||||
|
Loading…
Reference in New Issue
Block a user