DBZ-6317 Support timestamps without timezone for batch

This commit is contained in:
mfvitale 2023-11-02 19:30:07 +01:00 committed by Jiri Pechanec
parent b24cd359e4
commit 2f0750f450
12 changed files with 97 additions and 42 deletions

View File

@ -230,15 +230,36 @@ public interface DatabaseDialect {
* *
* @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default). * @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default).
*/ */
Object convertToCorrectDateTime(ZonedDateTime zonedDateTime); Object convertToCorrectZonedTimestamp(ZonedDateTime zonedDateTime);
/** /**
* Returns the correct date type for the database. * Returns the correct date type for the database.
* *
* @return the objects returned depends on the database dialect. * @return the object returned depends on the database dialect.
*/ */
Object convertToCorrectDate(LocalDate localDate); Object convertToCorrectDate(LocalDate localDate);
/**
* Returns the correct time type for the database.
*
* @return the object returned depends on the database dialect.
*/
Object convertToCorrectTimestamp(ZonedDateTime zonedDateTime);
/**
* Returns the correct time type for the database.
*
* @return the object returned depends on the database dialect.
*/
Object convertToCorrectTime(ZonedDateTime zonedDateTime);
/**
* Returns a time with timezone if is supported, without timezone if not supported by the database.
*
* @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default).
*/ // TODO fix docs
Optional<Integer> getZonedTimestampType();
/** /**
* Returns a time with timezone if is supported, without timezone if not supported by the database. * Returns a time with timezone if is supported, without timezone if not supported by the database.
* *

View File

@ -633,7 +633,7 @@ protected void registerTypes() {
} }
@Override @Override
public Object convertToCorrectDateTime(ZonedDateTime zonedTime) { public Object convertToCorrectZonedTimestamp(ZonedDateTime zonedTime) {
return zonedTime; return zonedTime;
} }
@ -643,10 +643,25 @@ public Object convertToCorrectDate(LocalDate localDate) {
} }
@Override @Override
public Optional<Integer> getTimestampType() { public Object convertToCorrectTimestamp(ZonedDateTime zonedDateTime) {
return zonedDateTime.toLocalDateTime();
}
@Override
public Object convertToCorrectTime(ZonedDateTime zonedDateTime) {
return zonedDateTime.toLocalDateTime().toLocalTime();
}
@Override
public Optional<Integer> getZonedTimestampType() {
return Optional.of(Types.TIMESTAMP_WITH_TIMEZONE); return Optional.of(Types.TIMESTAMP_WITH_TIMEZONE);
} }
@Override
public Optional<Integer> getTimestampType() {
return Optional.of(Types.TIMESTAMP);
}
protected void registerType(Type type) { protected void registerType(Type type) {
type.configure(connectorConfig, this); type.configure(connectorConfig, this);
for (String key : type.getRegistrationKeys()) { for (String key : type.getRegistrationKeys()) {

View File

@ -6,6 +6,7 @@
package io.debezium.connector.jdbc.dialect.db2; package io.debezium.connector.jdbc.dialect.db2;
import java.sql.Date; import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
@ -203,17 +204,32 @@ public String getTruncateStatement(TableDescriptor table) {
return builder.build(); return builder.build();
} }
@Override
public Optional<Integer> getZonedTimestampType() {
return Optional.empty();
}
@Override @Override
public Optional<Integer> getTimestampType() { public Optional<Integer> getTimestampType() {
return Optional.empty(); return Optional.empty();
} }
@Override @Override
public Object convertToCorrectDateTime(ZonedDateTime zonedTime) { public Object convertToCorrectZonedTimestamp(ZonedDateTime zonedTime) {
return Timestamp.from(zonedTime.toInstant()); return Timestamp.from(zonedTime.toInstant());
} }
public Object convertToCorrectDate(LocalDate localDate) { public Object convertToCorrectDate(LocalDate localDate) {
return Date.valueOf(localDate); return Date.valueOf(localDate);
} }
@Override
public Object convertToCorrectTimestamp(ZonedDateTime zonedDateTime) {
return Timestamp.valueOf(zonedDateTime.toLocalDateTime());
}
@Override
public Object convertToCorrectTime(ZonedDateTime zonedDateTime) {
return Time.valueOf(zonedDateTime.toLocalDateTime().toLocalTime());
}
} }

View File

@ -7,9 +7,11 @@
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalAccessor;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -190,7 +192,13 @@ protected void addColumnDefaultValue(SinkRecordDescriptor.FieldDescriptor field,
} }
@Override @Override
public Optional<Integer> getTimestampType() { public Optional<Integer> getZonedTimestampType() {
return Optional.empty(); return Optional.of(Types.TIMESTAMP); // TIMESTAMP_WITH_TIMEZONE not supported for MySQL
}
@Override
public Temporal convertToCorrectZonedTimestamp(ZonedDateTime zonedTime) {
return zonedTime.toOffsetDateTime();
} }
} }

View File

@ -7,7 +7,6 @@
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.temporal.Temporal; import java.time.temporal.Temporal;
@ -215,7 +214,7 @@ protected String resolveColumnNameFromField(String fieldName) {
} }
@Override @Override
public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) { public Temporal convertToCorrectZonedTimestamp(ZonedDateTime zonedTime) {
// TODO tested only with PostgreSQL check with others Databases // TODO tested only with PostgreSQL check with others Databases
// For timestamp with time zone, the internally stored value is always in UTC (Universal Coordinated Time, traditionally known as Greenwich Mean Time, GMT). // For timestamp with time zone, the internally stored value is always in UTC (Universal Coordinated Time, traditionally known as Greenwich Mean Time, GMT).
// An input value that has an explicit time zone specified is converted to UTC using the appropriate offset for that time zone. // An input value that has an explicit time zone specified is converted to UTC using the appropriate offset for that time zone.
@ -226,9 +225,4 @@ public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) {
// https://www.postgresql.org/docs/current/datatype-datetime.html // https://www.postgresql.org/docs/current/datatype-datetime.html
return zonedTime.toOffsetDateTime(); return zonedTime.toOffsetDateTime();
} }
@Override
public Optional<Integer> getTimestampType() {
return Optional.of(Types.TIMESTAMP_WITH_TIMEZONE);
}
} }

View File

@ -153,8 +153,12 @@ public String getByteArrayFormat() {
} }
@Override @Override
public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) { public Temporal convertToCorrectTime(ZonedDateTime zonedTime) {
return zonedTime.toOffsetDateTime(); return zonedTime.toOffsetDateTime();
} }
public Object convertToCorrectZonedTimestamp(ZonedDateTime zonedTime) {
return zonedTime.toOffsetDateTime();
}
} }

View File

@ -6,9 +6,8 @@
package io.debezium.connector.jdbc.type.connect; package io.debezium.connector.jdbc.type.connect;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -56,11 +55,9 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value instanceof Date) { if (value instanceof Date) {
final LocalTime localTime = DateTimeUtils.toLocalTimeFromUtcDate((Date) value); final LocalTime localTime = DateTimeUtils.toLocalTimeFromUtcDate((Date) value);
final ZonedDateTime zonedDateTime = localTime.atDate(LocalDate.now()).atZone(ZoneOffset.UTC); final LocalDateTime localDateTime = localTime.atDate(LocalDate.now());
if (getDialect().isTimeZoneSet()) { if (getDialect().isTimeZoneSet()) {
return List return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId()))));
.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zonedDateTime.withZoneSameInstant(getDatabaseTimeZone().toZoneId()))));
} }
// NOTE // NOTE
// ---- // ----
@ -68,7 +65,7 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
// To avoid this loss in precision from the source system, the following will bind the value // To avoid this loss in precision from the source system, the following will bind the value
// as a LocalDateTime using the current date as the base in order to avoid data loss. // as a LocalDateTime using the current date as the base in order to avoid data loss.
return List.of(new ValueBindDescriptor(index, zonedDateTime)); // TODO check if this works with PreparedStatement return List.of(new ValueBindDescriptor(index, localDateTime)); // TODO check if this works with PreparedStatement
} }
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),

View File

@ -5,8 +5,8 @@
*/ */
package io.debezium.connector.jdbc.type.connect; package io.debezium.connector.jdbc.type.connect;
import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Date;
@ -46,12 +46,13 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
return List.of(new ValueBindDescriptor(index, null)); return List.of(new ValueBindDescriptor(index, null));
} }
if (value instanceof java.util.Date) { if (value instanceof java.util.Date) {
final ZonedDateTime zonedDateTime = DateTimeUtils.toLocalDateTimeFromDate((java.util.Date) value).atZone(ZoneOffset.UTC); final LocalDateTime localDateTime = DateTimeUtils.toLocalDateTimeFromDate((java.util.Date) value);
if (getDialect().isTimeZoneSet()) { if (getDialect().isTimeZoneSet()) {
return List return List.of(new ValueBindDescriptor(index,
.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zonedDateTime.withZoneSameInstant(getDatabaseTimeZone().toZoneId())))); getDialect().convertToCorrectTimestamp(localDateTime.atZone(getDatabaseTimeZone().toZoneId())),
getDialect().getTimestampType().orElse(null)));
} }
return List.of(new ValueBindDescriptor(index, zonedDateTime)); return List.of(new ValueBindDescriptor(index, localDateTime));
} }
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),

View File

@ -6,9 +6,8 @@
package io.debezium.connector.jdbc.type.debezium; package io.debezium.connector.jdbc.type.debezium;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -48,12 +47,13 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
} }
if (value instanceof Number) { if (value instanceof Number) {
final LocalTime localTime = getLocalTime((Number) value); final LocalTime localTime = getLocalTime((Number) value);
final ZonedDateTime zonedDateTime = localTime.atDate(LocalDate.now()).atZone(ZoneOffset.UTC); final LocalDateTime localDateTime = localTime.atDate(LocalDate.now());
if (getDialect().isTimeZoneSet()) { if (getDialect().isTimeZoneSet()) {
return List return List.of(new ValueBindDescriptor(index,
.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zonedDateTime.withZoneSameInstant(getDatabaseTimeZone().toZoneId())))); getDialect().convertToCorrectTimestamp(localDateTime.atZone(getDatabaseTimeZone().toZoneId())),
getDialect().getTimestampType().orElse(null)));
} }
return List.of(new ValueBindDescriptor(index, zonedDateTime)); return List.of(new ValueBindDescriptor(index, localDateTime));
} }
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName())); value, value.getClass().getName()));

View File

@ -6,8 +6,6 @@
package io.debezium.connector.jdbc.type.debezium; package io.debezium.connector.jdbc.type.debezium;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -31,15 +29,15 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
} }
if (value instanceof Number) { if (value instanceof Number) {
final ZonedDateTime zonedDateTime = getLocalDateTime(((Number) value).longValue()).atZone(ZoneOffset.UTC); final LocalDateTime localDateTime = getLocalDateTime(((Number) value).longValue());
if (getDialect().isTimeZoneSet()) { if (getDialect().isTimeZoneSet()) {
return List.of(new ValueBindDescriptor(index, return List.of(new ValueBindDescriptor(index,
getDialect().convertToCorrectDateTime(zonedDateTime.withZoneSameInstant(getDatabaseTimeZone().toZoneId())), getDialect().convertToCorrectTimestamp(localDateTime.atZone(getDatabaseTimeZone().toZoneId())),
getDialect().getTimestampType().orElse(null))); getDialect().getTimestampType().orElse(null)));
} }
return List.of(new ValueBindDescriptor(index, zonedDateTime)); return List.of(new ValueBindDescriptor(index, localDateTime));
} }
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),

View File

@ -74,13 +74,14 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (getDialect().isTimeZoneSet()) { if (getDialect().isTimeZoneSet()) {
if (getDialect().shouldBindTimeWithTimeZoneAsDatabaseTimeZone()) { if (getDialect().shouldBindTimeWithTimeZoneAsDatabaseTimeZone()) {
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId())))); return List
.of(new ValueBindDescriptor(index, getDialect().convertToCorrectZonedTimestamp(zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId()))));
} }
// TODO check if this works with PreparedStatement // TODO check if this works with PreparedStatement
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt), getDialect().getTimestampType().orElse(null))); return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectZonedTimestamp(zdt), getDialect().getZonedTimestampType().orElse(null)));
} }
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt))); return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectZonedTimestamp(zdt)));
} }

View File

@ -47,7 +47,7 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
// TODO check if this works with PreparedStatement // TODO check if this works with PreparedStatement
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt), getDialect().getTimestampType().orElse(null))); return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectZonedTimestamp(zdt), getDialect().getZonedTimestampType().orElse(null)));
} }
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),