From 8cd1898310949f0ca81a1017319f346f73eba01d Mon Sep 17 00:00:00 2001 From: mfvitale Date: Fri, 27 Oct 2023 17:05:31 +0200 Subject: [PATCH] DBZ-6317 Support for date and time for PostgreSQL, Oracle, MySQL, SQLServer and DB2 --- .../connector/jdbc/NativeQueryBinder.java | 7 +++- .../jdbc/PreparedStatementQueryBinder.java | 3 +- .../connector/jdbc/ValueBindDescriptor.java | 6 ++-- .../jdbc/dialect/DatabaseDialect.java | 35 ++++++++++++------- .../jdbc/dialect/GeneralDatabaseDialect.java | 12 +++++-- .../jdbc/dialect/db2/Db2DatabaseDialect.java | 29 +++++++++------ .../dialect/mysql/MySqlDatabaseDialect.java | 4 +++ .../postgres/PostgresDatabaseDialect.java | 20 +++++++++-- .../postgres/TimeWithTimezoneType.java | 34 ++++++++++++++++++ .../sqlserver/SqlServerDatabaseDialect.java | 18 ++++++---- .../jdbc/type/connect/ConnectDateType.java | 2 +- .../jdbc/type/connect/ConnectTimeType.java | 2 +- .../type/connect/ConnectTimestampType.java | 2 +- .../debezium/AbstractDebeziumTimeType.java | 4 +-- .../AbstractDebeziumTimestampType.java | 16 ++------- .../jdbc/type/debezium/ZonedTimeType.java | 35 +++++++++++-------- .../type/debezium/ZonedTimestampType.java | 23 ++++++------ 17 files changed, 168 insertions(+), 84 deletions(-) diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/NativeQueryBinder.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/NativeQueryBinder.java index 783fe83fd..e46793820 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/NativeQueryBinder.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/NativeQueryBinder.java @@ -7,6 +7,11 @@ import org.hibernate.query.BindableType; import org.hibernate.query.NativeQuery; +import org.hibernate.type.StandardBasicTypes; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; public class NativeQueryBinder implements QueryBinder { @@ -20,7 +25,7 @@ public NativeQueryBinder(NativeQuery binder) { public void bind(ValueBindDescriptor valueBindDescriptor) { if (valueBindDescriptor.getBindableType() != null) { - binder.setParameter(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue(), (BindableType) valueBindDescriptor.getBindableType()); + binder.setParameter(valueBindDescriptor.getIndex(), ZonedDateTime.ofInstant(Instant.now(), ZoneOffset.UTC), (BindableType) StandardBasicTypes.ZONED_DATE_TIME_WITH_TIMEZONE); } else { binder.setParameter(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue()); diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/PreparedStatementQueryBinder.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/PreparedStatementQueryBinder.java index e336b44f4..aa066d709 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/PreparedStatementQueryBinder.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/PreparedStatementQueryBinder.java @@ -7,7 +7,6 @@ import java.sql.PreparedStatement; import java.sql.SQLException; -import java.sql.Types; public class PreparedStatementQueryBinder implements QueryBinder { @@ -22,7 +21,7 @@ public void bind(ValueBindDescriptor valueBindDescriptor) { try { if (valueBindDescriptor.getBindableType() != null) { // TODO improve the naming - binder.setObject(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue(), Types.TIMESTAMP_WITH_TIMEZONE); + binder.setObject(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue(), valueBindDescriptor.getBindableType()); } else { binder.setObject(valueBindDescriptor.getIndex(), valueBindDescriptor.getValue()); diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ValueBindDescriptor.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ValueBindDescriptor.java index 3b755492d..b1975fc58 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ValueBindDescriptor.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/ValueBindDescriptor.java @@ -10,7 +10,7 @@ public class ValueBindDescriptor { private final int index; private final Object value; - private final Object bindableType; + private final Integer bindableType; public ValueBindDescriptor(int index, Object value) { this.index = index; @@ -18,7 +18,7 @@ public ValueBindDescriptor(int index, Object value) { this.bindableType = null; } - public ValueBindDescriptor(int index, Object value, Object bindableType) { + public ValueBindDescriptor(int index, Object value, Integer bindableType) { this.index = index; this.value = value; this.bindableType = bindableType; @@ -32,7 +32,7 @@ public Object getValue() { return value; } - public Object getBindableType() { + public Integer getBindableType() { return bindableType; } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java index 77df86713..26d1cf02b 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java @@ -5,16 +5,6 @@ */ package io.debezium.connector.jdbc.dialect; -import java.sql.Connection; -import java.sql.SQLException; -import java.time.temporal.TemporalAccessor; -import java.util.List; -import java.util.Set; - -import org.apache.kafka.connect.data.Schema; -import org.hibernate.dialect.DatabaseVersion; -import org.hibernate.engine.jdbc.Size; - import io.debezium.connector.jdbc.SinkRecordDescriptor; import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor; import io.debezium.connector.jdbc.ValueBindDescriptor; @@ -22,6 +12,18 @@ import io.debezium.connector.jdbc.relational.TableDescriptor; import io.debezium.connector.jdbc.relational.TableId; import io.debezium.connector.jdbc.type.Type; +import org.apache.kafka.connect.data.Schema; +import org.hibernate.dialect.DatabaseVersion; +import org.hibernate.engine.jdbc.Size; + +import java.sql.Connection; +import java.sql.SQLException; +import java.time.ZonedDateTime; +import java.time.temporal.Temporal; +import java.time.temporal.TemporalAccessor; +import java.util.List; +import java.util.Optional; +import java.util.Set; /** * Represents a dialect of SQL implemented by a particular RDBMS. @@ -223,11 +225,18 @@ public interface DatabaseDialect { boolean shouldBindTimeWithTimeZoneAsDatabaseTimeZone(); /** - * Returns whether a time with time zone details is internally stored into database. + * Returns a time with timezone if is supported, without timezone if not supported by the database. * - * @return true if the value should be not zoned; false otherwise (the default). + * @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default). */ - boolean isZonedTimeSupported(); + Object convertToCorrectDateTime(ZonedDateTime zonedDateTime); + + /** + * Returns a time with timezone if is supported, without timezone if not supported by the database. + * + * @return LocalDataTime if zoned date is not supported; ZonedDateTime otherwise (the default). + */ //TODO fix docs + Optional getTimestampType(); /** * Gets the maximum precision allowed for a dialect's time data type. diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index 3545c7da1..d8e6bb300 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -10,8 +10,11 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.Temporal; import java.time.temporal.TemporalAccessor; import java.util.ArrayList; import java.util.Collection; @@ -630,8 +633,13 @@ protected void registerTypes() { } @Override - public boolean isZonedTimeSupported() { - return true; + public Object convertToCorrectDateTime(ZonedDateTime zonedTime) { + return zonedTime; + } + + @Override + public Optional getTimestampType() { + return Optional.of(Types.TIMESTAMP_WITH_TIMEZONE); } protected void registerType(Type type) { diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java index d7b5d4a11..2747cd945 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java @@ -5,16 +5,6 @@ */ package io.debezium.connector.jdbc.dialect.db2; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.TemporalAccessor; -import java.util.Optional; - -import org.hibernate.SessionFactory; -import org.hibernate.dialect.DB2Dialect; -import org.hibernate.dialect.Dialect; - import io.debezium.connector.jdbc.JdbcSinkConnectorConfig; import io.debezium.connector.jdbc.SinkRecordDescriptor; import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor; @@ -24,6 +14,16 @@ import io.debezium.connector.jdbc.dialect.SqlStatementBuilder; import io.debezium.connector.jdbc.relational.TableDescriptor; import io.debezium.time.ZonedTimestamp; +import org.hibernate.SessionFactory; +import org.hibernate.dialect.DB2Dialect; +import org.hibernate.dialect.Dialect; + +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.TemporalAccessor; +import java.util.Optional; /** * A {@link DatabaseDialect} implementation for Db2. @@ -199,4 +199,13 @@ public String getTruncateStatement(TableDescriptor table) { builder.append(" ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE"); return builder.build(); } + @Override + public Optional getTimestampType() { + return Optional.empty(); + } + + @Override + public Object convertToCorrectDateTime(ZonedDateTime zonedTime) { + return Timestamp.valueOf(zonedTime.toLocalDateTime()); + } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java index d7988da86..e35ebfe5d 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java @@ -189,4 +189,8 @@ protected void addColumnDefaultValue(SinkRecordDescriptor.FieldDescriptor field, super.addColumnDefaultValue(field, columnSpec); } + @Override + public Optional getTimestampType() { + return Optional.empty(); + } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java index 1b4f86103..4540f3b3e 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java @@ -7,7 +7,10 @@ import java.sql.Connection; import java.sql.SQLException; +import java.sql.Types; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.Temporal; import java.time.temporal.TemporalAccessor; import java.util.Optional; @@ -212,7 +215,20 @@ protected String resolveColumnNameFromField(String fieldName) { } @Override - public boolean isZonedTimeSupported() { - return false; + public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) { + // TODO tested only with PostgreSQL check with others Databases + // For timestamp with time zone, the internally stored value is always in UTC (Universal Coordinated Time, traditionally known as Greenwich Mean Time, GMT). + // An input value that has an explicit time zone specified is converted to UTC using the appropriate offset for that time zone. + // If no time zone is stated in the input string, then it is assumed to be in the time zone indicated by the system's TimeZone parameter, + // and is converted to UTC using the offset for the timezone zone. + // + // When a timestamp with time zone value is output, it is always converted from UTC to the current timezone zone, and displayed as local time in that zone. + // https://www.postgresql.org/docs/current/datatype-datetime.html + return zonedTime.toLocalDateTime(); + } + + @Override + public Optional getTimestampType() { + return Optional.of(Types.TIMESTAMP); } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/TimeWithTimezoneType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/TimeWithTimezoneType.java index d6da9cbba..0aecc39f0 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/TimeWithTimezoneType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/postgres/TimeWithTimezoneType.java @@ -5,9 +5,17 @@ */ package io.debezium.connector.jdbc.dialect.postgres; +import io.debezium.connector.jdbc.ValueBindDescriptor; import io.debezium.connector.jdbc.type.Type; import io.debezium.connector.jdbc.type.debezium.ZonedTimeType; import io.debezium.time.ZonedTime; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; + +import java.time.LocalDate; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.util.List; /** * An implementation of {@link Type} for {@link ZonedTime} types for PostgreSQL. @@ -23,4 +31,30 @@ public String[] getRegistrationKeys() { return new String[]{ ZonedTime.SCHEMA_NAME }; } + @Override + public List bind(int index, Schema schema, Object value) { + + if (value == null) { + return List.of(new ValueBindDescriptor(index, null)); + } + + if (value instanceof String) { + + final ZonedDateTime zdt = OffsetTime.parse((String) value, ZonedTime.FORMATTER).atDate(LocalDate.now()).toZonedDateTime(); + + if (getDialect().isTimeZoneSet()) { + if (getDialect().shouldBindTimeWithTimeZoneAsDatabaseTimeZone()) { + return List.of(new ValueBindDescriptor(index, zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId()))); + } + // TODO check if this works with PreparedStatement + return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime().toOffsetTime())); + } + + return List.of(new ValueBindDescriptor(index, zdt)); + + } + + throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), + value, value.getClass().getName())); + } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java index 8b919f7e0..31a11f9a8 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java @@ -5,12 +5,6 @@ */ package io.debezium.connector.jdbc.dialect.sqlserver; -import java.util.Optional; - -import org.hibernate.SessionFactory; -import org.hibernate.dialect.Dialect; -import org.hibernate.dialect.SQLServerDialect; - import io.debezium.connector.jdbc.JdbcSinkConnectorConfig; import io.debezium.connector.jdbc.SinkRecordDescriptor; import io.debezium.connector.jdbc.dialect.DatabaseDialect; @@ -18,6 +12,13 @@ import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect; import io.debezium.connector.jdbc.dialect.SqlStatementBuilder; import io.debezium.connector.jdbc.relational.TableDescriptor; +import org.hibernate.SessionFactory; +import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.SQLServerDialect; + +import java.time.ZonedDateTime; +import java.time.temporal.Temporal; +import java.util.Optional; /** * A {@link DatabaseDialect} implementation for SQL Server. @@ -150,4 +151,9 @@ public String getByteArrayFormat() { return "CONVERT(VARBINARY, '0x%s')"; } + @Override + public Temporal convertToCorrectDateTime(ZonedDateTime zonedTime) { + + return zonedTime.toOffsetDateTime(); + } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectDateType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectDateType.java index 20fe9310d..99019deb7 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectDateType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectDateType.java @@ -43,7 +43,7 @@ public List bind(int index, Schema schema, Object value) { return List.of(new ValueBindDescriptor(index, null)); } if (value instanceof java.util.Date) { - return List.of(new ValueBindDescriptor(index, DateTimeUtils.toLocalDateFromDate((java.util.Date) value))); + return List.of(new ValueBindDescriptor(index, DateTimeUtils.toLocalDateFromDate((java.util.Date) value))); //TODO add a get Date as for getFormattedDate } throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimeType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimeType.java index 55107a457..fb1f29894 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimeType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimeType.java @@ -56,7 +56,7 @@ public List bind(int index, Schema schema, Object value) { final LocalTime localTime = DateTimeUtils.toLocalTimeFromUtcDate((Date) value); final LocalDateTime localDateTime = localTime.atDate(LocalDate.now()); if (getDialect().isTimeZoneSet()) { - return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId()))); + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId())))); } // NOTE // ---- diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimestampType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimestampType.java index a3cb16178..43aa14e9c 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimestampType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/connect/ConnectTimestampType.java @@ -48,7 +48,7 @@ public List bind(int index, Schema schema, Object value) { if (value instanceof java.util.Date) { final LocalDateTime localDateTime = DateTimeUtils.toLocalDateTimeFromDate((java.util.Date) value); if (getDialect().isTimeZoneSet()) { - return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId()))); + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId())))); } return List.of(new ValueBindDescriptor(index, localDateTime)); } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimeType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimeType.java index d6b134f27..18915dd08 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimeType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimeType.java @@ -49,12 +49,12 @@ public List bind(int index, Schema schema, Object value) { final LocalTime localTime = getLocalTime((Number) value); final LocalDateTime localDateTime = localTime.atDate(LocalDate.now()); if (getDialect().isTimeZoneSet()) { - return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId()))); + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId())))); } return List.of(new ValueBindDescriptor(index, localDateTime)); } throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), - value.toString(), value.getClass().getName())); + value, value.getClass().getName())); } protected abstract LocalTime getLocalTime(Number value); diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimestampType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimestampType.java index f7cc97cf6..b4fec17e0 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimestampType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/AbstractDebeziumTimestampType.java @@ -30,21 +30,9 @@ public List bind(int index, Schema schema, Object value) { if (value instanceof Number) { final LocalDateTime localDateTime = getLocalDateTime(((Number) value).longValue()); + if (getDialect().isTimeZoneSet()) { - - if (getDialect().isZonedTimeSupported()) { - - return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId()))); - } - // TODO tested only with PostgreSQL check with others Databases - // For timestamp with time zone, the internally stored value is always in UTC (Universal Coordinated Time, traditionally known as Greenwich Mean Time, GMT). - // An input value that has an explicit time zone specified is converted to UTC using the appropriate offset for that time zone. - // If no time zone is stated in the input string, then it is assumed to be in the time zone indicated by the system's TimeZone parameter, - // and is converted to UTC using the offset for the timezone zone. - // - // When a timestamp with time zone value is output, it is always converted from UTC to the current timezone zone, and displayed as local time in that zone. - // https://www.postgresql.org/docs/current/datatype-datetime.html - return List.of(new ValueBindDescriptor(index, localDateTime.atZone(getDatabaseTimeZone().toZoneId()).toOffsetDateTime())); + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(localDateTime.atZone(getDatabaseTimeZone().toZoneId())))); } return List.of(new ValueBindDescriptor(index, localDateTime)); diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimeType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimeType.java index d05679e2a..e5dc87b45 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimeType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimeType.java @@ -5,22 +5,20 @@ */ package io.debezium.connector.jdbc.type.debezium; -import java.sql.Types; -import java.time.LocalDate; -import java.time.OffsetTime; -import java.time.ZonedDateTime; -import java.util.List; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; -import org.hibernate.engine.jdbc.Size; -import org.hibernate.type.StandardBasicTypes; - import io.debezium.connector.jdbc.ValueBindDescriptor; import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.type.AbstractTimeType; import io.debezium.connector.jdbc.type.Type; import io.debezium.time.ZonedTime; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.hibernate.engine.jdbc.Size; + +import java.sql.Types; +import java.time.LocalDate; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.util.List; /** * An implementation of {@link Type} for {@link ZonedTime} values. @@ -65,19 +63,26 @@ public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Obj @Override public List bind(int index, Schema schema, Object value) { + if (value == null) { return List.of(new ValueBindDescriptor(index, null)); } + if (value instanceof String) { final ZonedDateTime zdt = OffsetTime.parse((String) value, ZonedTime.FORMATTER).atDate(LocalDate.now()).toZonedDateTime(); + if (getDialect().isTimeZoneSet()) { if (getDialect().shouldBindTimeWithTimeZoneAsDatabaseTimeZone()) { - return List.of(new ValueBindDescriptor(index, zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId()))); + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt.withZoneSameInstant(getDatabaseTimeZone().toZoneId())))); } // TODO check if this works with PreparedStatement - return List.of(new ValueBindDescriptor(index, zdt, StandardBasicTypes.ZONED_DATE_TIME_WITH_TIMEZONE)); + if (getDialect().getTimestampType().isPresent()) { + + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt), getDialect().getTimestampType().get())); + } } - return List.of(new ValueBindDescriptor(index, zdt)); + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt))); + } throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), @@ -85,6 +90,6 @@ public List bind(int index, Schema schema, Object value) { } protected int getJdbcType(DatabaseDialect dialect) { - return Types.TIME_WITH_TIMEZONE; + return Types.TIME_WITH_TIMEZONE; //TODO use this for timestamp type? } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java index fa60b6b89..f8c792a8a 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java @@ -5,19 +5,17 @@ */ package io.debezium.connector.jdbc.type.debezium; -import java.sql.Types; -import java.time.ZonedDateTime; -import java.util.List; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; -import org.hibernate.type.StandardBasicTypes; - import io.debezium.connector.jdbc.ValueBindDescriptor; import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.type.AbstractTimestampType; import io.debezium.connector.jdbc.type.Type; import io.debezium.time.ZonedTimestamp; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; + +import java.sql.Types; +import java.time.ZonedDateTime; +import java.util.List; /** * An implementation of {@link Type} for {@link ZonedTimestamp} values. @@ -45,10 +43,13 @@ public List bind(int index, Schema schema, Object value) { return List.of(new ValueBindDescriptor(index, null)); } if (value instanceof String) { - final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER) - .withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); // TODO check if this works with PreparedStatement - return List.of(new ValueBindDescriptor(index, zdt, StandardBasicTypes.ZONED_DATE_TIME_WITH_TIMEZONE)); + if(getDialect().getTimestampType().isPresent()) { + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt), getDialect().getTimestampType().get())); + } + + return List.of(new ValueBindDescriptor(index, getDialect().convertToCorrectDateTime(zdt))); } throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),