From 4fd96e86af7a267784f4be13e1d05fd31fcede33 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Thu, 27 Jun 2024 14:54:30 +0200 Subject: [PATCH] DBZ-7920 Add support for timestamp infinity values for Oracle --- .../jdbc/dialect/GeneralDatabaseDialect.java | 2 +- .../dialect/oracle/OracleDatabaseDialect.java | 10 +++++ .../dialect/oracle/ZonedTimestampType.java | 44 ++++++------------- .../jdbc/e2e/AbstractJdbcSinkPipelineIT.java | 36 ++++++++++----- 4 files changed, 50 insertions(+), 42 deletions(-) diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index f212b055a..91059b5ff 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -11,8 +11,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; -import java.time.Instant; import java.sql.Types; +import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/OracleDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/OracleDatabaseDialect.java index 425916b3c..3a5bdbccd 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/OracleDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/OracleDatabaseDialect.java @@ -151,6 +151,16 @@ public String getFormattedTimestampWithTimeZone(String value) { return String.format(TO_TIMESTAMP_FF9_TZ, value); } + @Override + public String getTimestampPositiveInfinityValue() { + return "9999-12-31T23:59:59+00:00"; + } + + @Override + public String getTimestampNegativeInfinityValue() { + return "-4712-01-01T00:00:00+00:00"; + } + @Override protected String resolveColumnNameFromField(String fieldName) { String columnName = super.resolveColumnNameFromField(fieldName); diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/ZonedTimestampType.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/ZonedTimestampType.java index a20b527ae..31206f675 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/ZonedTimestampType.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/oracle/ZonedTimestampType.java @@ -5,16 +5,10 @@ */ package io.debezium.connector.jdbc.dialect.oracle; -import java.sql.Types; import java.time.ZonedDateTime; import java.util.List; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; - import io.debezium.connector.jdbc.ValueBindDescriptor; -import io.debezium.connector.jdbc.dialect.DatabaseDialect; -import io.debezium.connector.jdbc.type.AbstractTimestampType; import io.debezium.connector.jdbc.type.Type; import io.debezium.time.ZonedTimestamp; @@ -23,40 +17,28 @@ * * @author Chris Cranford */ -public class ZonedTimestampType extends AbstractTimestampType { +public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType { public static final ZonedTimestampType INSTANCE = new ZonedTimestampType(); - @Override - public String[] getRegistrationKeys() { - return new String[]{ ZonedTimestamp.SCHEMA_NAME }; - } + protected List infinityTimestampValue(int index, Object value) { + final ZonedDateTime zdt; - @Override - public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) { - return dialect.getFormattedTimestampWithTimeZone((String) value); - } - - @Override - public List bind(int index, Schema schema, Object value) { - - if (value == null) { - return List.of(new ValueBindDescriptor(index, null)); + if (POSITIVE_INFINITY.equals(value)) { + zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER); } - if (value instanceof String) { - - final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); - - return List.of(new ValueBindDescriptor(index, zdt, getJdbcType())); + else { + zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER); } - throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), - value, value.getClass().getName())); + return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType())); } @Override - protected int getJdbcType() { - return Types.TIMESTAMP_WITH_TIMEZONE; - } + protected List normalTimestampValue(int index, Object value) { + final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + + return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType())); + } } diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java index 3752824a1..98fff981a 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java @@ -36,6 +36,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.connect.sink.SinkRecord; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -2575,16 +2576,7 @@ public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Si final List values = List.of("'-infinity'", "'infinity'"); - List expectedValues = List.of(); - if (sink.getType().is(SinkType.SQLSERVER)) { - - expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), - ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC)); - } - else if (sink.getType().is(SinkType.MYSQL)) { - expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC), - ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC)); - } + List expectedValues = getExpectedZonedDateTimes(sink); assertDataTypesNonKeyOnly(source, sink, @@ -2598,6 +2590,30 @@ else if (sink.getType().is(SinkType.MYSQL)) { (rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC)); } + private static @NotNull List getExpectedZonedDateTimes(Sink sink) { + + List expectedValues = List.of(); + if (sink.getType().is(SinkType.SQLSERVER)) { + + expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC)); + } + else if (sink.getType().is(SinkType.MYSQL)) { + + expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC), + ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC)); + } + else if (sink.getType().is(SinkType.ORACLE)) { + + // The value read by the rs.getTimestamp() is correct but then the + // rs.getTimestamp().toInstant() will return -4712-11-24. I suspect a bug somewhere in the time library. + // The value on the DB is correct since select to_char(A , 'AD YYYY-MM-DD HH24:MI:SS') will return BC 4712-01-01 00:00:00 + expectedValues = List.of(ZonedDateTime.of(-4712, 11, 24, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC)); + } + return expectedValues; + } + // todo: remaining data types need tests and/or type system mapping support // GEOMETRY (MySql/PostgreSQL) // LINESTRING (MySQL)