diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java index df8472024..23490a0cb 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java @@ -19,6 +19,7 @@ import io.debezium.connector.oracle.parser.PlSqlParser.Column_definitionContext; import io.debezium.connector.oracle.parser.PlSqlParser.Column_nameContext; import io.debezium.connector.oracle.parser.PlSqlParser.Create_tableContext; +import io.debezium.connector.oracle.parser.PlSqlParser.ExpressionContext; import io.debezium.connector.oracle.parser.PlSqlParser.Out_of_line_constraintContext; import io.debezium.connector.oracle.parser.PlSqlParser.Precision_partContext; import io.debezium.connector.oracle.parser.PlSqlParser.Tableview_nameContext; @@ -231,6 +232,7 @@ else if (ctx.datatype().native_datatype_element().NUMBER() != null) { } } else if (ctx.datatype().INTERVAL() != null + && ctx.datatype().YEAR() != null && ctx.datatype().TO() != null && ctx.datatype().MONTH() != null) { columnEditor @@ -241,6 +243,27 @@ else if (ctx.datatype().INTERVAL() != null columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); } } + else if (ctx.datatype().INTERVAL() != null + && ctx.datatype().DAY() != null + && ctx.datatype().TO() != null + && ctx.datatype().SECOND() != null) { + columnEditor + .jdbcType(OracleTypes.INTERVALDS) + .type("INTERVAL DAY TO SECOND") + .length(2) + .scale(6); + for (final ExpressionContext e: ctx.datatype().expression()) { + if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) { + columnEditor.scale(Integer.valueOf(e.getText())); + } + else { + columnEditor.length(Integer.valueOf(e.getText())); + } + } + if (!ctx.datatype().expression().isEmpty()) { + columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); + } + } else { throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText()); } @@ -270,7 +293,7 @@ public void exitOut_of_line_constraint(Out_of_line_constraintContext ctx) { .collect(Collectors.toList()); editor.setPrimaryKeyNames(pkColumnNames); - } + } super.exitOut_of_line_constraint(ctx); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index a3785ad8a..3ec4df85d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -9,7 +9,10 @@ import java.sql.SQLException; import java.sql.Types; import java.time.ZonedDateTime; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.SchemaBuilder; @@ -26,6 +29,7 @@ import oracle.sql.BINARY_FLOAT; import oracle.sql.CHAR; import oracle.sql.DATE; +import oracle.sql.INTERVALDS; import oracle.sql.INTERVALYM; import oracle.sql.NUMBER; import oracle.sql.TIMESTAMP; @@ -34,6 +38,7 @@ public class OracleValueConverters extends JdbcValueConverters { private static int NUMBER_VARIABLE_SCALE_LENGTH = 0; + private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)"); private final OracleConnection connection; @@ -67,6 +72,7 @@ public SchemaBuilder schemaBuilder(Column column) { case OracleTypes.TIMESTAMPLTZ: return ZonedTimestamp.builder(); case OracleTypes.INTERVALYM: + case OracleTypes.INTERVALDS: return MicroDuration.builder(); default: return super.schemaBuilder(column); @@ -95,7 +101,9 @@ public ValueConverter converter(Column column, Field fieldDefn) { case OracleTypes.TIMESTAMPLTZ: return (data) -> convertTimestampWithZone(column, fieldDefn, data); case OracleTypes.INTERVALYM: - return (data) -> convertInterval(column, fieldDefn, data); + return (data) -> convertIntervalYearMonth(column, fieldDefn, data); + case OracleTypes.INTERVALDS: + return (data) -> convertIntervalDaySecond(column, fieldDefn, data); } return super.converter(column, fieldDefn); @@ -244,7 +252,7 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object return super.convertTimestampWithZone(column, fieldDefn, fromOracleTimeClasses(column, data)); } - protected Object convertInterval(Column column, Field fieldDefn, Object data) { + protected Object convertIntervalYearMonth(Column column, Field fieldDefn, Object data) { if (data == null) { data = fieldDefn.schema().defaultValue(); } @@ -275,4 +283,36 @@ protected Object convertInterval(Column column, Field fieldDefn, Object data) { } return handleUnknownData(column, fieldDefn, data); } + + protected Object convertIntervalDaySecond(Column column, Field fieldDefn, Object data) { + if (data == null) { + data = fieldDefn.schema().defaultValue(); + } + if (data == null) { + if (column.isOptional()) return null; + return NumberConversions.DOUBLE_FALSE; + } + if (data instanceof Number) { + // we expect to get back from the plugin a double value + return ((Number) data).doubleValue(); + } + if (data instanceof INTERVALDS) { + final String interval = ((INTERVALDS) data).stringValue(); + System.err.println(interval); + final Matcher m = INTERVAL_DAY_SECOND_PATTERN.matcher(interval); + if (m.matches()) { + final int sign = "-".equals(m.group(1)) ? -1 : 1; + return MicroDuration.durationMicros( + 0, + 0, + sign * Integer.valueOf(m.group(2)), + sign * Integer.valueOf(m.group(3)), + sign * Integer.valueOf(m.group(4)), + sign * Integer.valueOf(m.group(5)), + sign * Integer.valueOf(StringUtils.rightPad(m.group(6), 6, '0')), + MicroDuration.DAYS_PER_MONTH_AVG); + } + } + return handleUnknownData(column, fieldDefn, data); + } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java index 648b45358..d2bcc711e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java @@ -79,6 +79,7 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest " val_tstz timestamp with time zone, " + " val_tsltz timestamp with local time zone, " + " val_int_ytm interval year to month, " + + " val_int_dts interval day(3) to second(2), " + " primary key (id)" + ")"; @@ -109,7 +110,8 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest new SchemaAndValueField("VAL_DATE", Timestamp.builder().optional().build(), 1522108800_000l), new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890), new SchemaAndValueField("VAL_TSTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00"), - new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000.0) + new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000.0), + new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000.0) // new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00") ); @@ -220,7 +222,7 @@ public void intTypes() throws Exception { @Test public void timeTypes() throws Exception { int expectedRecordCount = 0; - connection.execute("INSERT INTO debezium.type_time VALUES (1, '27-MAR-2018', '27-MAR-2018 12:34:56.00789', '27-MAR-2018 01:34:56.00789 am -11:00', '27-MAR-2018 01:34:56.00789', INTERVAL '-3-6' YEAR TO MONTH)"); + connection.execute("INSERT INTO debezium.type_time VALUES (1, '27-MAR-2018', '27-MAR-2018 12:34:56.00789', '27-MAR-2018 01:34:56.00789 am -11:00', '27-MAR-2018 01:34:56.00789', INTERVAL '-3-6' YEAR TO MONTH, INTERVAL '-1 2:3:4.56' DAY TO SECOND)"); connection.execute("COMMIT"); Testing.debug("Inserted"); diff --git a/debezium-core/src/main/java/io/debezium/time/MicroDuration.java b/debezium-core/src/main/java/io/debezium/time/MicroDuration.java index db2e178a3..a634eb89a 100644 --- a/debezium-core/src/main/java/io/debezium/time/MicroDuration.java +++ b/debezium-core/src/main/java/io/debezium/time/MicroDuration.java @@ -68,17 +68,36 @@ private MicroDuration() { * @param hours a number of hours * @param minutes a number of minutes * @param seconds a number of seconds - * @param daysPerMonthAvg an optional value representing a days per month average; if null, the default duration + * @param micros a number of microseconds + * @param daysPerMonthAvg an optional value representing a days per month average; if null, the default duration * from {@link ChronoUnit#MONTHS} is used. * @return a {@link BigDecimal} value which contains the number of microseconds, never {@code null} - */ - public static double durationMicros(int years, int months, int days, int hours, int minutes, double seconds, - Double daysPerMonthAvg) { + */ + public static double durationMicros(int years, int months, int days, int hours, int minutes, double seconds, + int micros, Double daysPerMonthAvg) { if (daysPerMonthAvg == null) { daysPerMonthAvg = (double) ChronoUnit.MONTHS.getDuration().toDays(); } double numberOfDays = ((years * 12) + months) * daysPerMonthAvg + days; double numberOfSeconds = (((numberOfDays * 24 + hours) * 60) + minutes) * 60 + seconds; - return numberOfSeconds * 1e6; + return numberOfSeconds * 1e6 + micros; + } + + /** + * Converts a number of time units to a duration in microseconds. + * + * @param years a number of years + * @param months a number of months + * @param days a number of days + * @param hours a number of hours + * @param minutes a number of minutes + * @param seconds a number of seconds + * @param daysPerMonthAvg an optional value representing a days per month average; if null, the default duration + * from {@link ChronoUnit#MONTHS} is used. + * @return a {@link BigDecimal} value which contains the number of microseconds, never {@code null} + */ + public static double durationMicros(int years, int months, int days, int hours, int minutes, double seconds, + Double daysPerMonthAvg) { + return durationMicros(years, months, days, hours, minutes, seconds, 0, daysPerMonthAvg); } }