DBZ-6317 Support for date and time for PostgreSQL, Oracle, MySQL, SQLServer and DB2

This commit is contained in:
mfvitale 2023-10-27 17:05:31 +02:00 committed by Jiri Pechanec
parent 8557c0752b
commit 8cd1898310
17 changed files with 168 additions and 84 deletions

View File

@ -7,6 +7,11 @@
import org.hibernate.query.BindableType;
import org.hibernate.query.NativeQuery;
import org.hibernate.type.StandardBasicTypes;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
public class NativeQueryBinder implements QueryBinder {
@ -20,7 +25,7 @@ public NativeQueryBinder(NativeQuery<?> binder) {
public void bind(ValueBindDescriptor valueBindDescriptor) {
if (valueBindDescriptor.getBindableType() != null) {
binder.setParameter(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue(), (BindableType) valueBindDescriptor.getBindableType());
binder.setParameter(valueBindDescriptor.getIndex(), ZonedDateTime.ofInstant(Instant.now(), ZoneOffset.UTC), (BindableType) StandardBasicTypes.ZONED_DATE_TIME_WITH_TIMEZONE);
}
else {
binder.setParameter(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue());

View File

@ -7,7 +7,6 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
public class PreparedStatementQueryBinder implements QueryBinder {
@ -22,7 +21,7 @@ public void bind(ValueBindDescriptor valueBindDescriptor) {
try {
if (valueBindDescriptor.getBindableType() != null) { // TODO improve the naming
binder.setObject(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue(), Types.TIMESTAMP_WITH_TIMEZONE);
binder.setObject(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue(), valueBindDescriptor.getBindableType());
}
else {
binder.setObject(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue());

View File

@ -10,7 +10,7 @@ public class ValueBindDescriptor {
private final int index;
private final Object value;
private final Object bindableType;
private final Integer bindableType;
public ValueBindDescriptor(int index, Object value) {
this.index = index;
@ -18,7 +18,7 @@ public ValueBindDescriptor(int index, Object value) {
this.bindableType = null;
}
public ValueBindDescriptor(int index, Object value, Object bindableType) {
public ValueBindDescriptor(int index, Object value, Integer bindableType) {
this.index = index;
this.value = value;
this.bindableType = bindableType;
@ -32,7 +32,7 @@ public Object getValue() {
return value;
}
public Object getBindableType() {
public Integer getBindableType() {
return bindableType;
}
}

View File

@ -5,16 +5,6 @@
*/
package io.debezium.connector.jdbc.dialect;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
import java.util.Set;
import org.apache.kafka.connect.data.Schema;
import org.hibernate.dialect.DatabaseVersion;
import org.hibernate.engine.jdbc.Size;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor;
import io.debezium.connector.jdbc.ValueBindDescriptor;
@ -22,6 +12,18 @@
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.connector.jdbc.type.Type;
import org.apache.kafka.connect.data.Schema;
import org.hibernate.dialect.DatabaseVersion;
import org.hibernate.engine.jdbc.Size;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAccessor;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Represents a dialect of SQL implemented by a particular RDBMS.
@ -223,11 +225,18 @@ public interface DatabaseDialect {
boolean shouldBindTimeWithTimeZoneAsDatabaseTimeZone();
/**
* Returns whether a time with time zone details is internally stored into database.
* Returns a time with timezone if is supported, without timezone if not supported by the database.
*
* @return true if the value should be not zoned; false otherwise (the default).
* @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default).
*/
boolean isZonedTimeSupported();
Object convertToCorrectDateTime(ZonedDateTime zonedDateTime);
/**
* Returns a time with timezone if is supported, without timezone if not supported by the database.
*
* @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default).
*/ //TODO fix docs
Optional<Integer> getTimestampType();
/**
* Gets the maximum precision allowed for a dialect's time data type.

View File

@ -10,8 +10,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Collection;
@ -630,8 +633,13 @@ protected void registerTypes() {
}
@Override
public boolean isZonedTimeSupported() {
return true;
public Object convertToCorrectDateTime(ZonedDateTime zonedTime) {
return zonedTime;
}
@Override
public Optional<Integer> getTimestampType() {
return Optional.of(Types.TIMESTAMP_WITH_TIMEZONE);
}
protected void registerType(Type type) {

View File

@ -5,16 +5,6 @@
*/
package io.debezium.connector.jdbc.dialect.db2;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.TemporalAccessor;
import java.util.Optional;
import org.hibernate.SessionFactory;
import org.hibernate.dialect.DB2Dialect;
import org.hibernate.dialect.Dialect;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor;
@ -24,6 +14,16 @@
import io.debezium.connector.jdbc.dialect.SqlStatementBuilder;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.time.ZonedTimestamp;
import org.hibernate.SessionFactory;
import org.hibernate.dialect.DB2Dialect;
import org.hibernate.dialect.Dialect;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.TemporalAccessor;
import java.util.Optional;
/**
* A {@link DatabaseDialect} implementation for Db2.
@ -199,4 +199,13 @@ public String getTruncateStatement(TableDescriptor table) {
builder.append(" ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE");
return builder.build();
}
@Override
public Optional<Integer> getTimestampType() {
return Optional.empty();
}
@Override
public Object convertToCorrectDateTime(ZonedDateTime zonedTime) {
return Timestamp.valueOf(zonedTime.toLocalDateTime());
}
}

View File

@ -189,4 +189,8 @@ protected void addColumnDefaultValue(SinkRecordDescriptor.FieldDescriptor field,
super.addColumnDefaultValue(field, columnSpec);
}
@Override
public Optional<Integer> getTimestampType() {
return Optional.empty();
}
}

View File

@ -7,7 +7,10 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAccessor;
import java.util.Optional;
@ -212,7 +215,20 @@ protected String resolveColumnNameFromField(String fieldName) {
}
@Override
public boolean isZonedTimeSupported() {
return false;
public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) {
// TODO tested only with PostgreSQL check with others Databases
// For timestamp with time zone, the internally stored value is always in UTC (Universal Coordinated Time, traditionally known as Greenwich Mean Time, GMT).
// An input value that has an explicit time zone specified is converted to UTC using the appropriate offset for that time zone.
// If no time zone is stated in the input string, then it is assumed to be in the time zone indicated by the system's TimeZone parameter,
// and is converted to UTC using the offset for the timezone zone.
//
// When a timestamp with time zone value is output, it is always converted from UTC to the current timezone zone, and displayed as local time in that zone.
// https://www.postgresql.org/docs/current/datatype-datetime.html
return zonedTime.toLocalDateTime();
}
@Override
public Optional<Integer> getTimestampType() {
return Optional.of(Types.TIMESTAMP);
}
}

View File

@ -5,9 +5,17 @@
*/
package io.debezium.connector.jdbc.dialect.postgres;
import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.debezium.ZonedTimeType;
import io.debezium.time.ZonedTime;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import java.time.LocalDate;
import java.time.OffsetTime;
import java.time.ZonedDateTime;
import java.util.List;
/**
* An implementation of {@link Type} for {@link ZonedTime} types for PostgreSQL.
@ -23,4 +31,30 @@ public String[] getRegistrationKeys() {
return new String[]{ ZonedTime.SCHEMA_NAME };
}
@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {
final ZonedDateTime zdt = OffsetTime.parse((String) value, ZonedTime.FORMATTER).atDate(LocalDate.now()).toZonedDateTime();
if (getDialect().isTimeZoneSet()) {
if (getDialect().shouldBindTimeWithTimeZoneAsDatabaseTimeZone()) {
return List.of(new ValueBindDescriptor(index, zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId())));
}
// TODO check if this works with PreparedStatement
return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime().toOffsetTime()));
}
return List.of(new ValueBindDescriptor(index, zdt));
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
}
}

View File

@ -5,12 +5,6 @@
*/
package io.debezium.connector.jdbc.dialect.sqlserver;
import java.util.Optional;
import org.hibernate.SessionFactory;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.SQLServerDialect;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
@ -18,6 +12,13 @@
import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect;
import io.debezium.connector.jdbc.dialect.SqlStatementBuilder;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import org.hibernate.SessionFactory;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.SQLServerDialect;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.Optional;
/**
* A {@link DatabaseDialect} implementation for SQL Server.
@ -150,4 +151,9 @@ public String getByteArrayFormat() {
return "CONVERT(VARBINARY, '0x%s')";
}
@Override
public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) {
return zonedTime.toOffsetDateTime();
}
}

View File

@ -43,7 +43,7 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof java.util.Date) {
return List.of(new ValueBindDescriptor(index, DateTimeUtils.toLocalDateFromDate((java.util.Date) value)));
return List.of(new ValueBindDescriptor(index, DateTimeUtils.toLocalDateFromDate((java.util.Date) value))); //TODO add a get Date as for getFormattedDate
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),

View File

@ -56,7 +56,7 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
final LocalTime localTime = DateTimeUtils.toLocalTimeFromUtcDate((Date) value);
final LocalDateTime localDateTime = localTime.atDate(LocalDate.now());
if (getDialect().isTimeZoneSet()) {
return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId())));
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId()))));
}
// NOTE
// ----

View File

@ -48,7 +48,7 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value instanceof java.util.Date) {
final LocalDateTime localDateTime = DateTimeUtils.toLocalDateTimeFromDate((java.util.Date) value);
if (getDialect().isTimeZoneSet()) {
return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId())));
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId()))));
}
return List.of(new ValueBindDescriptor(index, localDateTime));
}

View File

@ -49,12 +49,12 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
final LocalTime localTime = getLocalTime((Number) value);
final LocalDateTime localDateTime = localTime.atDate(LocalDate.now());
if (getDialect().isTimeZoneSet()) {
return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId())));
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId()))));
}
return List.of(new ValueBindDescriptor(index, localDateTime));
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value.toString(), value.getClass().getName()));
value, value.getClass().getName()));
}
protected abstract LocalTime getLocalTime(Number value);

View File

@ -30,21 +30,9 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value instanceof Number) {
final LocalDateTime localDateTime = getLocalDateTime(((Number) value).longValue());
if (getDialect().isTimeZoneSet()) {
if (getDialect().isZonedTimeSupported()) {
return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId())));
}
// TODO tested only with PostgreSQL check with others Databases
// For timestamp with time zone, the internally stored value is always in UTC (Universal Coordinated Time, traditionally known as Greenwich Mean Time, GMT).
// An input value that has an explicit time zone specified is converted to UTC using the appropriate offset for that time zone.
// If no time zone is stated in the input string, then it is assumed to be in the time zone indicated by the system's TimeZone parameter,
// and is converted to UTC using the offset for the timezone zone.
//
// When a timestamp with time zone value is output, it is always converted from UTC to the current timezone zone, and displayed as local time in that zone.
// https://www.postgresql.org/docs/current/datatype-datetime.html
return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId()).toOffsetDateTime()));
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId()))));
}
return List.of(new ValueBindDescriptor(index, localDateTime));

View File

@ -5,22 +5,20 @@
*/
package io.debezium.connector.jdbc.type.debezium;
import java.sql.Types;
import java.time.LocalDate;
import java.time.OffsetTime;
import java.time.ZonedDateTime;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.hibernate.engine.jdbc.Size;
import org.hibernate.type.StandardBasicTypes;
import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractTimeType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTime;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.hibernate.engine.jdbc.Size;
import java.sql.Types;
import java.time.LocalDate;
import java.time.OffsetTime;
import java.time.ZonedDateTime;
import java.util.List;
/**
* An implementation of {@link Type} for {@link ZonedTime} values.
@ -65,19 +63,26 @@ public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Obj
@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {
final ZonedDateTime zdt = OffsetTime.parse((String) value, ZonedTime.FORMATTER).atDate(LocalDate.now()).toZonedDateTime();
if (getDialect().isTimeZoneSet()) {
if (getDialect().shouldBindTimeWithTimeZoneAsDatabaseTimeZone()) {
return List.of(new ValueBindDescriptor(index, zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId())));
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId()))));
}
// TODO check if this works with PreparedStatement
return List.of(new ValueBindDescriptor(index, zdt, StandardBasicTypes.ZONED_DATE_TIME_WITH_TIMEZONE));
if (getDialect().getTimestampType().isPresent()) {
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt), getDialect().getTimestampType().get()));
}
}
return List.of(new ValueBindDescriptor(index, zdt));
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt)));
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
@ -85,6 +90,6 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
}
protected int getJdbcType(DatabaseDialect dialect) {
return Types.TIME_WITH_TIMEZONE;
return Types.TIME_WITH_TIMEZONE; //TODO use this for timestamp type?
}
}

View File

@ -5,19 +5,17 @@
*/
package io.debezium.connector.jdbc.type.debezium;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.hibernate.type.StandardBasicTypes;
import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;
/**
* An implementation of {@link Type} for {@link ZonedTimestamp} values.
@ -45,10 +43,13 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
// TODO check if this works with PreparedStatement
return List.of(new ValueBindDescriptor(index, zdt, StandardBasicTypes.ZONED_DATE_TIME_WITH_TIMEZONE));
if(getDialect().getTimestampType().isPresent()) {
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt), getDialect().getTimestampType().get()));
}
return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt)));
}
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),