DBZ-8142 Fix conversion of BigInteger
timestamp values
This commit is contained in:
parent
9892a5b374
commit
421be19ffe
@ -5,6 +5,7 @@
|
|||||||
*/
|
*/
|
||||||
package io.debezium.connector.oracle.olr;
|
package io.debezium.connector.oracle.olr;
|
||||||
|
|
||||||
|
import java.math.BigInteger;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
@ -62,7 +63,7 @@ protected Object convertNumeric(Column column, Field fieldDefn, Object value) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object value) {
|
protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object value) {
|
||||||
if (value instanceof Long) {
|
if (value instanceof Number) {
|
||||||
value = convertTimestampValue(column, value);
|
value = convertTimestampValue(column, value);
|
||||||
}
|
}
|
||||||
return super.convertTimestampToEpochMillis(column, fieldDefn, value);
|
return super.convertTimestampToEpochMillis(column, fieldDefn, value);
|
||||||
@ -70,7 +71,7 @@ protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, O
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object value) {
|
protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object value) {
|
||||||
if (value instanceof Long) {
|
if (value instanceof Number) {
|
||||||
value = convertTimestampValue(column, value);
|
value = convertTimestampValue(column, value);
|
||||||
}
|
}
|
||||||
return super.convertTimestampToEpochMicros(column, fieldDefn, value);
|
return super.convertTimestampToEpochMicros(column, fieldDefn, value);
|
||||||
@ -78,7 +79,7 @@ protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, O
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object value) {
|
protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object value) {
|
||||||
if (value instanceof Long) {
|
if (value instanceof Number) {
|
||||||
value = convertTimestampValue(column, value);
|
value = convertTimestampValue(column, value);
|
||||||
}
|
}
|
||||||
return super.convertTimestampToEpochNanos(column, fieldDefn, value);
|
return super.convertTimestampToEpochNanos(column, fieldDefn, value);
|
||||||
@ -86,7 +87,7 @@ protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Ob
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object convertTimestampToEpochMillisAsDate(Column column, Field fieldDefn, Object value) {
|
protected Object convertTimestampToEpochMillisAsDate(Column column, Field fieldDefn, Object value) {
|
||||||
if (value instanceof Long) {
|
if (value instanceof Number) {
|
||||||
value = convertTimestampValue(column, value);
|
value = convertTimestampValue(column, value);
|
||||||
}
|
}
|
||||||
return super.convertTimestampToEpochMillisAsDate(column, fieldDefn, value);
|
return super.convertTimestampToEpochMillisAsDate(column, fieldDefn, value);
|
||||||
@ -113,7 +114,7 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object convertTimestampWithLocalZone(Column column, Field fieldDefn, Object value) {
|
protected Object convertTimestampWithLocalZone(Column column, Field fieldDefn, Object value) {
|
||||||
if (value instanceof Long) {
|
if (value instanceof Number) {
|
||||||
final Instant instant = Instant.ofEpochSecond(0, (Long) value);
|
final Instant instant = Instant.ofEpochSecond(0, (Long) value);
|
||||||
return getTimestampWithLocalTimeZoneFormatter(column).format(OffsetDateTime.ofInstant(instant, ZoneOffset.UTC));
|
return getTimestampWithLocalTimeZoneFormatter(column).format(OffsetDateTime.ofInstant(instant, ZoneOffset.UTC));
|
||||||
}
|
}
|
||||||
@ -155,7 +156,12 @@ private Object convertTimestampValue(Column column, Object value) {
|
|||||||
if (column.typeName().equalsIgnoreCase(COLUMN_TYPE_DATE)) {
|
if (column.typeName().equalsIgnoreCase(COLUMN_TYPE_DATE)) {
|
||||||
// Value is being provided in nanoseconds based on OpenLogReplicator configuration
|
// Value is being provided in nanoseconds based on OpenLogReplicator configuration
|
||||||
// We need to reduce the column's precision to milliseconds
|
// We need to reduce the column's precision to milliseconds
|
||||||
value = ((Long) value) / 1_000_000L;
|
if (value instanceof BigInteger) {
|
||||||
|
value = ((BigInteger) value).divide(BigInteger.valueOf(1_000_000L)).longValue();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
value = ((Long) value) / 1_000_000L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// TIMESTAMP(n)
|
// TIMESTAMP(n)
|
||||||
|
@ -124,6 +124,7 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
|
|||||||
" val_tsltz timestamp with local time zone, " +
|
" val_tsltz timestamp with local time zone, " +
|
||||||
" val_int_ytm interval year to month, " +
|
" val_int_ytm interval year to month, " +
|
||||||
" val_int_dts interval day(3) to second(2), " +
|
" val_int_dts interval day(3) to second(2), " +
|
||||||
|
" val_max_date date, " +
|
||||||
" primary key (id)" +
|
" primary key (id)" +
|
||||||
")";
|
")";
|
||||||
|
|
||||||
@ -232,7 +233,8 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
|
|||||||
LocalDateTime.of(2018, 3, 27, 1, 34, 56, 7890 * 1_000).atZone(ZoneOffset.systemDefault())
|
LocalDateTime.of(2018, 3, 27, 1, 34, 56, 7890 * 1_000).atZone(ZoneOffset.systemDefault())
|
||||||
.withZoneSameInstant(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"))),
|
.withZoneSameInstant(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"))),
|
||||||
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000L),
|
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000L),
|
||||||
new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000L));
|
new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000L),
|
||||||
|
new SchemaAndValueField("VAL_MAX_DATE", Timestamp.builder().optional().build(), 71_863_286_400_000L));
|
||||||
|
|
||||||
private static final List<SchemaAndValueField> EXPECTED_TIME_AS_CONNECT = Arrays.asList(
|
private static final List<SchemaAndValueField> EXPECTED_TIME_AS_CONNECT = Arrays.asList(
|
||||||
new SchemaAndValueField("VAL_DATE", org.apache.kafka.connect.data.Timestamp.builder().optional().build(),
|
new SchemaAndValueField("VAL_DATE", org.apache.kafka.connect.data.Timestamp.builder().optional().build(),
|
||||||
@ -250,7 +252,9 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
|
|||||||
LocalDateTime.of(2018, 3, 27, 1, 34, 56, 7890 * 1_000).atZone(ZoneOffset.systemDefault())
|
LocalDateTime.of(2018, 3, 27, 1, 34, 56, 7890 * 1_000).atZone(ZoneOffset.systemDefault())
|
||||||
.withZoneSameInstant(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"))),
|
.withZoneSameInstant(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"))),
|
||||||
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000L),
|
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000L),
|
||||||
new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000L));
|
new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000L),
|
||||||
|
new SchemaAndValueField("VAL_MAX_DATE", org.apache.kafka.connect.data.Timestamp.builder().optional().build(),
|
||||||
|
java.util.Date.from(LocalDate.of(4247, 4, 5).atStartOfDay().atOffset(ZoneOffset.UTC).toInstant())));
|
||||||
|
|
||||||
private static final String CLOB_JSON = Testing.Files.readResourceAsString("data/test_lob_data.json");
|
private static final String CLOB_JSON = Testing.Files.readResourceAsString("data/test_lob_data.json");
|
||||||
private static final String NCLOB_JSON = Testing.Files.readResourceAsString("data/test_lob_data2.json");
|
private static final String NCLOB_JSON = Testing.Files.readResourceAsString("data/test_lob_data2.json");
|
||||||
@ -732,6 +736,7 @@ protected static void insertTimeTypes() throws SQLException {
|
|||||||
+ ", TO_TIMESTAMP_TZ('2018-03-27 01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5')"
|
+ ", TO_TIMESTAMP_TZ('2018-03-27 01:34:56.00789', 'yyyy-mm-dd HH24:MI:SS.FF5')"
|
||||||
+ ", INTERVAL '-3-6' YEAR TO MONTH"
|
+ ", INTERVAL '-3-6' YEAR TO MONTH"
|
||||||
+ ", INTERVAL '-1 2:3:4.56' DAY TO SECOND"
|
+ ", INTERVAL '-1 2:3:4.56' DAY TO SECOND"
|
||||||
|
+ ", TO_DATE('4247-04-05', 'yyyy-mm-dd')"
|
||||||
+ ")");
|
+ ")");
|
||||||
connection.execute("COMMIT");
|
connection.execute("COMMIT");
|
||||||
}
|
}
|
||||||
|
@ -4932,6 +4932,7 @@ sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba <<- EOF
|
|||||||
GRANT SELECT, FLASHBACK ON SYS.TABSUBPART$ TO c##dbzuser;
|
GRANT SELECT, FLASHBACK ON SYS.TABSUBPART$ TO c##dbzuser;
|
||||||
GRANT SELECT, FLASHBACK ON SYS.TS$ TO c##dbzuser;
|
GRANT SELECT, FLASHBACK ON SYS.TS$ TO c##dbzuser;
|
||||||
GRANT SELECT, FLASHBACK ON SYS.USER$ TO c##dbzuser;
|
GRANT SELECT, FLASHBACK ON SYS.USER$ TO c##dbzuser;
|
||||||
|
GRANT SELECT, FLASHBACK ON XDB.XDB$TTSET TO c##dbzuser;
|
||||||
|
|
||||||
exit;
|
exit;
|
||||||
EOF
|
EOF
|
||||||
@ -5002,7 +5003,7 @@ For more information about the required format of this file, see the https://git
|
|||||||
[source,json,indent=0]
|
[source,json,indent=0]
|
||||||
----
|
----
|
||||||
{
|
{
|
||||||
"version": "1.5.0",
|
"version": "1.6.0",
|
||||||
"source": [{
|
"source": [{
|
||||||
"alias": "SOURCE",
|
"alias": "SOURCE",
|
||||||
"name": "ORACLE", <1>
|
"name": "ORACLE", <1>
|
||||||
|
Loading…
Reference in New Issue
Block a user