DBZ-20 Support for day to second interval
This commit is contained in:
parent
89694ffbbf
commit
232a1d5573
@ -19,6 +19,7 @@
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.Column_definitionContext;
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.Column_nameContext;
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.Create_tableContext;
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.ExpressionContext;
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.Out_of_line_constraintContext;
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.Precision_partContext;
|
||||
import io.debezium.connector.oracle.parser.PlSqlParser.Tableview_nameContext;
|
||||
@ -231,6 +232,7 @@ else if (ctx.datatype().native_datatype_element().NUMBER() != null) {
|
||||
}
|
||||
}
|
||||
else if (ctx.datatype().INTERVAL() != null
|
||||
&& ctx.datatype().YEAR() != null
|
||||
&& ctx.datatype().TO() != null
|
||||
&& ctx.datatype().MONTH() != null) {
|
||||
columnEditor
|
||||
@ -241,6 +243,27 @@ else if (ctx.datatype().INTERVAL() != null
|
||||
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText())));
|
||||
}
|
||||
}
|
||||
else if (ctx.datatype().INTERVAL() != null
|
||||
&& ctx.datatype().DAY() != null
|
||||
&& ctx.datatype().TO() != null
|
||||
&& ctx.datatype().SECOND() != null) {
|
||||
columnEditor
|
||||
.jdbcType(OracleTypes.INTERVALDS)
|
||||
.type("INTERVAL DAY TO SECOND")
|
||||
.length(2)
|
||||
.scale(6);
|
||||
for (final ExpressionContext e: ctx.datatype().expression()) {
|
||||
if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) {
|
||||
columnEditor.scale(Integer.valueOf(e.getText()));
|
||||
}
|
||||
else {
|
||||
columnEditor.length(Integer.valueOf(e.getText()));
|
||||
}
|
||||
}
|
||||
if (!ctx.datatype().expression().isEmpty()) {
|
||||
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText())));
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText());
|
||||
}
|
||||
@ -270,7 +293,7 @@ public void exitOut_of_line_constraint(Out_of_line_constraintContext ctx) {
|
||||
.collect(Collectors.toList());
|
||||
|
||||
editor.setPrimaryKeyNames(pkColumnNames);
|
||||
}
|
||||
}
|
||||
|
||||
super.exitOut_of_line_constraint(ctx);
|
||||
}
|
||||
|
@ -9,7 +9,10 @@
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Types;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
|
||||
@ -26,6 +29,7 @@
|
||||
import oracle.sql.BINARY_FLOAT;
|
||||
import oracle.sql.CHAR;
|
||||
import oracle.sql.DATE;
|
||||
import oracle.sql.INTERVALDS;
|
||||
import oracle.sql.INTERVALYM;
|
||||
import oracle.sql.NUMBER;
|
||||
import oracle.sql.TIMESTAMP;
|
||||
@ -34,6 +38,7 @@
|
||||
|
||||
public class OracleValueConverters extends JdbcValueConverters {
|
||||
private static int NUMBER_VARIABLE_SCALE_LENGTH = 0;
|
||||
private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)");
|
||||
|
||||
private final OracleConnection connection;
|
||||
|
||||
@ -67,6 +72,7 @@ public SchemaBuilder schemaBuilder(Column column) {
|
||||
case OracleTypes.TIMESTAMPLTZ:
|
||||
return ZonedTimestamp.builder();
|
||||
case OracleTypes.INTERVALYM:
|
||||
case OracleTypes.INTERVALDS:
|
||||
return MicroDuration.builder();
|
||||
default:
|
||||
return super.schemaBuilder(column);
|
||||
@ -95,7 +101,9 @@ public ValueConverter converter(Column column, Field fieldDefn) {
|
||||
case OracleTypes.TIMESTAMPLTZ:
|
||||
return (data) -> convertTimestampWithZone(column, fieldDefn, data);
|
||||
case OracleTypes.INTERVALYM:
|
||||
return (data) -> convertInterval(column, fieldDefn, data);
|
||||
return (data) -> convertIntervalYearMonth(column, fieldDefn, data);
|
||||
case OracleTypes.INTERVALDS:
|
||||
return (data) -> convertIntervalDaySecond(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
return super.converter(column, fieldDefn);
|
||||
@ -244,7 +252,7 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object
|
||||
return super.convertTimestampWithZone(column, fieldDefn, fromOracleTimeClasses(column, data));
|
||||
}
|
||||
|
||||
protected Object convertInterval(Column column, Field fieldDefn, Object data) {
|
||||
protected Object convertIntervalYearMonth(Column column, Field fieldDefn, Object data) {
|
||||
if (data == null) {
|
||||
data = fieldDefn.schema().defaultValue();
|
||||
}
|
||||
@ -275,4 +283,36 @@ protected Object convertInterval(Column column, Field fieldDefn, Object data) {
|
||||
}
|
||||
return handleUnknownData(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
protected Object convertIntervalDaySecond(Column column, Field fieldDefn, Object data) {
|
||||
if (data == null) {
|
||||
data = fieldDefn.schema().defaultValue();
|
||||
}
|
||||
if (data == null) {
|
||||
if (column.isOptional()) return null;
|
||||
return NumberConversions.DOUBLE_FALSE;
|
||||
}
|
||||
if (data instanceof Number) {
|
||||
// we expect to get back from the plugin a double value
|
||||
return ((Number) data).doubleValue();
|
||||
}
|
||||
if (data instanceof INTERVALDS) {
|
||||
final String interval = ((INTERVALDS) data).stringValue();
|
||||
System.err.println(interval);
|
||||
final Matcher m = INTERVAL_DAY_SECOND_PATTERN.matcher(interval);
|
||||
if (m.matches()) {
|
||||
final int sign = "-".equals(m.group(1)) ? -1 : 1;
|
||||
return MicroDuration.durationMicros(
|
||||
0,
|
||||
0,
|
||||
sign * Integer.valueOf(m.group(2)),
|
||||
sign * Integer.valueOf(m.group(3)),
|
||||
sign * Integer.valueOf(m.group(4)),
|
||||
sign * Integer.valueOf(m.group(5)),
|
||||
sign * Integer.valueOf(StringUtils.rightPad(m.group(6), 6, '0')),
|
||||
MicroDuration.DAYS_PER_MONTH_AVG);
|
||||
}
|
||||
}
|
||||
return handleUnknownData(column, fieldDefn, data);
|
||||
}
|
||||
}
|
||||
|
@ -79,6 +79,7 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
|
||||
" val_tstz timestamp with time zone, " +
|
||||
" val_tsltz timestamp with local time zone, " +
|
||||
" val_int_ytm interval year to month, " +
|
||||
" val_int_dts interval day(3) to second(2), " +
|
||||
" primary key (id)" +
|
||||
")";
|
||||
|
||||
@ -109,7 +110,8 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
|
||||
new SchemaAndValueField("VAL_DATE", Timestamp.builder().optional().build(), 1522108800_000l),
|
||||
new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890),
|
||||
new SchemaAndValueField("VAL_TSTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00"),
|
||||
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000.0)
|
||||
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000.0),
|
||||
new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000.0)
|
||||
// new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00")
|
||||
);
|
||||
|
||||
@ -220,7 +222,7 @@ public void intTypes() throws Exception {
|
||||
@Test
|
||||
public void timeTypes() throws Exception {
|
||||
int expectedRecordCount = 0;
|
||||
connection.execute("INSERT INTO debezium.type_time VALUES (1, '27-MAR-2018', '27-MAR-2018 12:34:56.00789', '27-MAR-2018 01:34:56.00789 am -11:00', '27-MAR-2018 01:34:56.00789', INTERVAL '-3-6' YEAR TO MONTH)");
|
||||
connection.execute("INSERT INTO debezium.type_time VALUES (1, '27-MAR-2018', '27-MAR-2018 12:34:56.00789', '27-MAR-2018 01:34:56.00789 am -11:00', '27-MAR-2018 01:34:56.00789', INTERVAL '-3-6' YEAR TO MONTH, INTERVAL '-1 2:3:4.56' DAY TO SECOND)");
|
||||
connection.execute("COMMIT");
|
||||
|
||||
Testing.debug("Inserted");
|
||||
|
@ -68,17 +68,36 @@ private MicroDuration() {
|
||||
* @param hours a number of hours
|
||||
* @param minutes a number of minutes
|
||||
* @param seconds a number of seconds
|
||||
* @param micros a number of microseconds
|
||||
* @param daysPerMonthAvg an optional value representing a days per month average; if null, the default duration
|
||||
* from {@link ChronoUnit#MONTHS} is used.
|
||||
* @return a {@link BigDecimal} value which contains the number of microseconds, never {@code null}
|
||||
*/
|
||||
*/
|
||||
public static double durationMicros(int years, int months, int days, int hours, int minutes, double seconds,
|
||||
Double daysPerMonthAvg) {
|
||||
int micros, Double daysPerMonthAvg) {
|
||||
if (daysPerMonthAvg == null) {
|
||||
daysPerMonthAvg = (double) ChronoUnit.MONTHS.getDuration().toDays();
|
||||
}
|
||||
double numberOfDays = ((years * 12) + months) * daysPerMonthAvg + days;
|
||||
double numberOfSeconds = (((numberOfDays * 24 + hours) * 60) + minutes) * 60 + seconds;
|
||||
return numberOfSeconds * 1e6;
|
||||
return numberOfSeconds * 1e6 + micros;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a number of time units to a duration in microseconds.
|
||||
*
|
||||
* @param years a number of years
|
||||
* @param months a number of months
|
||||
* @param days a number of days
|
||||
* @param hours a number of hours
|
||||
* @param minutes a number of minutes
|
||||
* @param seconds a number of seconds
|
||||
* @param daysPerMonthAvg an optional value representing a days per month average; if null, the default duration
|
||||
* from {@link ChronoUnit#MONTHS} is used.
|
||||
* @return a {@link BigDecimal} value which contains the number of microseconds, never {@code null}
|
||||
*/
|
||||
public static double durationMicros(int years, int months, int days, int hours, int minutes, double seconds,
|
||||
Double daysPerMonthAvg) {
|
||||
return durationMicros(years, months, days, hours, minutes, seconds, 0, daysPerMonthAvg);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user