diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 14dec1967..994dab4b0 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -317,6 +317,7 @@ Thomas Prelle Thomas Thornton Tin Nguyen Tom Bentley +Tom Billiet Tomaz Lemos Fernandes Tommy Karlsson Tony Rizko diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 1c46e13a4..c3203dd0e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -93,6 +93,16 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withValidation(OracleConnectorConfig::validateOutServerName) .withDescription("Name of the XStream Out server to connect to."); + public static final Field INTERVAL_HANDLING_MODE = Field.create("interval.handling.mode") + .withDisplayName("Interval Handling") + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21)) + .withEnum(IntervalHandlingMode.class, IntervalHandlingMode.NUMERIC) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDescription("Specify how INTERVAL columns should be represented in change events, including:" + + "'string' represents values as an exact ISO formatted string" + + "'numeric' (default) represents values using the inexact conversion into microseconds"); + public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") .withDisplayName("Snapshot mode") .withEnum(SnapshotMode.class, SnapshotMode.INITIAL) @@ -413,6 +423,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector SNAPSHOT_ENHANCEMENT_TOKEN, SNAPSHOT_LOCKING_MODE, RAC_NODES, + INTERVAL_HANDLING_MODE, LOG_MINING_ARCHIVE_LOG_HOURS, LOG_MINING_BATCH_SIZE_DEFAULT, LOG_MINING_BATCH_SIZE_MIN, @@ -457,6 +468,7 @@ public static ConfigDef configDef() { private final String databaseName; private final String pdbName; private final String xoutServerName; + private final IntervalHandlingMode intervalHandlingMode; private final SnapshotMode snapshotMode; private final String oracleVersion; @@ -496,6 +508,7 @@ public OracleConnectorConfig(Configuration config) { this.databaseName = toUpperCase(config.getString(DATABASE_NAME)); this.pdbName = toUpperCase(config.getString(PDB_NAME)); this.xoutServerName = config.getString(XSTREAM_SERVER_NAME); + this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(INTERVAL_HANDLING_MODE)); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); this.oracleVersion = config.getString(ORACLE_VERSION); this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN); @@ -552,6 +565,10 @@ public String getXoutServerName() { return xoutServerName; } + public IntervalHandlingMode getIntervalHandlingMode() { + return intervalHandlingMode; + } + public SnapshotMode getSnapshotMode() { return snapshotMode; } @@ -569,6 +586,67 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return getAdapter().getHistoryRecordComparator(); } + /** + * Defines modes of representation of {@code interval} datatype + */ + public enum IntervalHandlingMode implements EnumeratedValue { + + /** + * Represents interval as inexact microseconds count + */ + NUMERIC("numeric"), + + /** + * Represents interval as ISO 8601 time interval + */ + STRING("string"); + + private final String value; + + IntervalHandlingMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Convert mode name into the logical value + * + * @param value the configuration property value ; may not be null + * @return the matching option, or null if the match is not found + */ + public static IntervalHandlingMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (IntervalHandlingMode option : IntervalHandlingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + /** + * Convert mode name into the logical value + * + * @param value the configuration property value ; may not be null + * @param defaultValue the default value ; may be null + * @return the matching option or null if the match is not found and non-null default is invalid + */ + public static IntervalHandlingMode parse(String value, String defaultValue) { + IntervalHandlingMode mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + } + /** * The set of predefined SnapshotMode options or aliases. */ diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index 7dbb44ae7..f8fd80011 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -37,6 +37,7 @@ import io.debezium.relational.Column; import io.debezium.relational.ValueConverter; import io.debezium.time.Date; +import io.debezium.time.Interval; import io.debezium.time.MicroDuration; import io.debezium.time.ZonedTimestamp; import io.debezium.util.NumberConversions; @@ -106,9 +107,11 @@ public class OracleValueConverters extends JdbcValueConverters { private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE); + private static final BigDecimal MICROSECONDS_PER_SECOND = new BigDecimal(1_000_000); private final OracleConnection connection; private final boolean lobEnabled; + private final OracleConnectorConfig.IntervalHandlingMode intervalHandlingMode; private final byte[] unavailableValuePlaceholderBinary; private final String unavailableValuePlaceholderString; @@ -116,6 +119,7 @@ public OracleValueConverters(OracleConnectorConfig config, OracleConnection conn super(config.getDecimalMode(), config.getTemporalPrecisionMode(), ZoneOffset.UTC, null, null, null); this.connection = connection; this.lobEnabled = config.isLobEnabled(); + this.intervalHandlingMode = config.getIntervalHandlingMode(); this.unavailableValuePlaceholderBinary = config.getUnavailableValuePlaceholder(); this.unavailableValuePlaceholderString = new String(config.getUnavailableValuePlaceholder()); } @@ -144,7 +148,7 @@ public SchemaBuilder schemaBuilder(Column column) { return ZonedTimestamp.builder(); case OracleTypes.INTERVALYM: case OracleTypes.INTERVALDS: - return MicroDuration.builder(); + return intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING ? Interval.builder() : MicroDuration.builder(); case Types.STRUCT: return SchemaBuilder.string(); default: { @@ -651,7 +655,13 @@ protected Object convertIntervalYearMonth(Column column, Field fieldDefn, Object return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> { if (data instanceof Number) { // we expect to get back from the plugin a double value - r.deliver(((Number) data).longValue()); + final long micros = ((Number) data).longValue(); + if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) { + r.deliver(Interval.toIsoString(0, 0, 0, 0, 0, new BigDecimal(micros).divide(MICROSECONDS_PER_SECOND))); + } + else { + r.deliver(micros); + } } else if (data instanceof INTERVALYM) { convertOracleIntervalYearMonth(data, r); @@ -677,8 +687,13 @@ private void convertOracleIntervalYearMonth(Object data, ResultReceiver r) { if (interval.charAt(i) == '-') { final int year = sign * Integer.parseInt(interval.substring(start, i)); final int month = sign * Integer.parseInt(interval.substring(i + 1, interval.length())); - r.deliver(MicroDuration.durationMicros(year, month, 0, 0, - 0, 0, MicroDuration.DAYS_PER_MONTH_AVG)); + if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) { + r.deliver(Interval.toIsoString(year, month, 0, 0, 0, BigDecimal.ZERO)); + } + else { + r.deliver(MicroDuration.durationMicros(year, month, 0, 0, + 0, 0, MicroDuration.DAYS_PER_MONTH_AVG)); + } } } } @@ -687,7 +702,13 @@ protected Object convertIntervalDaySecond(Column column, Field fieldDefn, Object return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> { if (data instanceof Number) { // we expect to get back from the plugin a double value - r.deliver(((Number) data).longValue()); + final long micros = ((Number) data).longValue(); + if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) { + r.deliver(Interval.toIsoString(0, 0, 0, 0, 0, new BigDecimal(micros).divide(MICROSECONDS_PER_SECOND))); + } + else { + r.deliver(micros); + } } else if (data instanceof INTERVALDS) { convertOracleIntervalDaySecond(data, r); @@ -706,15 +727,28 @@ private void convertOracleIntervalDaySecond(Object data, ResultReceiver r) { final Matcher m = INTERVAL_DAY_SECOND_PATTERN.matcher(interval); if (m.matches()) { final int sign = "-".equals(m.group(1)) ? -1 : 1; - r.deliver(MicroDuration.durationMicros( - 0, - 0, - sign * Integer.valueOf(m.group(2)), - sign * Integer.valueOf(m.group(3)), - sign * Integer.valueOf(m.group(4)), - sign * Integer.valueOf(m.group(5)), - sign * Integer.valueOf(Strings.pad(m.group(6), 6, '0')), - MicroDuration.DAYS_PER_MONTH_AVG)); + if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) { + double seconds = (double) (sign * Integer.parseInt(m.group(5))) + + (double) Integer.parseInt(Strings.pad(m.group(6), 6, '0')) / 1_000_000D; + r.deliver(Interval.toIsoString( + 0, + 0, + sign * Integer.valueOf(m.group(2)), + sign * Integer.valueOf(m.group(3)), + sign * Integer.valueOf(m.group(4)), + BigDecimal.valueOf(seconds))); + } + else { + r.deliver(MicroDuration.durationMicros( + 0, + 0, + sign * Integer.valueOf(m.group(2)), + sign * Integer.valueOf(m.group(3)), + sign * Integer.valueOf(m.group(4)), + sign * Integer.valueOf(m.group(5)), + sign * Integer.valueOf(Strings.pad(m.group(6), 6, '0')), + MicroDuration.DAYS_PER_MONTH_AVG)); + } } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index bb9a8f312..11260c3b0 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -11,6 +11,7 @@ import static io.debezium.connector.oracle.util.TestHelper.defaultConfig; import static io.debezium.data.Envelope.FieldName.AFTER; import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.fail; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.MapAssert.entry; @@ -947,6 +948,220 @@ public void shouldPropagateSourceTypeByDatatype() throws Exception { entry(TYPE_LENGTH_PARAMETER_KEY, "10")); } + @Test + @FixFor("DBZ-1539") + public void shouldHandlerIntervalTypesAsInt64() throws Exception { + // Drop table if it exists + TestHelper.dropTable(connection, "debezium.interval"); + + try { + // complex ddl + final String ddl = "create table debezium.interval (" + + " id numeric(6) constraint interval_id_nn not null, " + + " intYM interval year to month," + + " intYM2 interval year(9) to month," + // max precision + " intDS interval day to second, " + + " intDS2 interval day(9) to second(9), " + // max precision + " constraint interval_pk primary key(id)" + + ")"; + + // create table + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.interval to " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.interval ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + + // Insert a snapshot record + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, " + + "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"); + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, " + + "INTERVAL '0' DAY, INTERVAL '0' SECOND)"); + connection.commit(); + + final Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL") + .with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog") + .build(); + + // Perform a basic startup & initial snapshot of data + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Verify record generated during snapshot + final List records = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL"); + assertThat(records.size()).isEqualTo(2); + records.forEach(rec -> { + Struct after = ((Struct) rec.value()).getStruct(AFTER); + Struct key = (Struct) rec.key(); + switch ((int) key.get("ID")) { + case 1: + assertThat(after.getInt64("INTYM")).isEqualTo(63115200000000L); + assertThat(after.getInt64("INTYM2")).isEqualTo(17524987200000000L); + assertThat(after.getInt64("INTDS")).isEqualTo(259200000000L); + assertThat(after.getInt64("INTDS2")).isEqualTo(9627503444333L); + break; + case 2: + assertThat(after.getInt64("INTYM")).isEqualTo(0L); + assertThat(after.getInt64("INTYM2")).isEqualTo(0L); + assertThat(after.getInt64("INTDS")).isEqualTo(0L); + assertThat(after.getInt64("INTDS2")).isEqualTo(0L); + break; + default: + fail("unexpected id"); + } + }); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, " + + "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"); + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, " + + "INTERVAL '0' DAY, INTERVAL '0' SECOND)"); + connection.commit(); + + // Verify record generated during streaming + List streamingRecords = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL"); + assertThat(streamingRecords.size()).isEqualTo(2); + + streamingRecords.forEach(rec -> { + Struct after = ((Struct) rec.value()).getStruct(AFTER); + Struct key = (Struct) rec.key(); + switch ((int) key.get("ID")) { + case 3: + assertThat(after.getInt64("INTYM")).isEqualTo(63115200000000L); + assertThat(after.getInt64("INTYM2")).isEqualTo(17524987200000000L); + assertThat(after.getInt64("INTDS")).isEqualTo(259200000000L); + assertThat(after.getInt64("INTDS2")).isEqualTo(9627503444333L); + break; + case 4: + assertThat(after.getInt64("INTYM")).isEqualTo(0L); + assertThat(after.getInt64("INTYM2")).isEqualTo(0L); + assertThat(after.getInt64("INTDS")).isEqualTo(0L); + assertThat(after.getInt64("INTDS2")).isEqualTo(0L); + break; + default: + fail("unexpected id"); + } + }); + } + finally { + TestHelper.dropTable(connection, "debezium.interval"); + } + } + + @Test + @FixFor("DBZ-1539") + public void shouldHandlerIntervalTypesAsString() throws Exception { + // Drop table if it exists + TestHelper.dropTable(connection, "debezium.interval"); + + try { + // complex ddl + final String ddl = "create table debezium.interval (" + + " id numeric(6) constraint interval_id_nn not null, " + + " intYM interval year to month," + + " intYM2 interval year(9) to month," + // max precision + " intDS interval day to second, " + + " intDS2 interval day(9) to second(9), " + // max precision + " constraint interval_pk primary key(id)" + + ")"; + + // create table + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.interval to " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.interval ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + + // Insert a snapshot record + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, " + + "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"); + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, " + + "INTERVAL '0' DAY, INTERVAL '0' SECOND)"); + connection.commit(); + + final Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL") + .with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog") + .with(OracleConnectorConfig.INTERVAL_HANDLING_MODE, + OracleConnectorConfig.IntervalHandlingMode.STRING.getValue()) + .build(); + + // Perform a basic startup & initial snapshot of data + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Verify record generated during snapshot + final List records = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL"); + assertThat(records.size()).isEqualTo(2); + records.forEach(rec -> { + Struct after = ((Struct) rec.value()).getStruct(AFTER); + Struct key = (Struct) rec.key(); + switch ((int) key.get("ID")) { + case 1: + assertThat(after.getString("INTYM")).isEqualTo("P2Y0M0DT0H0M0S"); + assertThat(after.getString("INTYM2")).isEqualTo("P555Y4M0DT0H0M0S"); + assertThat(after.getString("INTDS")).isEqualTo("P0Y0M3DT0H0M0S"); + assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M111DT10H9M563.444333S"); + break; + case 2: + assertThat(after.getString("INTYM")).isEqualTo("P0Y0M0DT0H0M0S"); + assertThat(after.getString("INTYM2")).isEqualTo("P0Y0M0DT0H0M0S"); + assertThat(after.getString("INTDS")).isEqualTo("P0Y0M0DT0H0M0S"); + assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M0DT0H0M0S"); + break; + default: + fail("unexpected id"); + } + }); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, " + + "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"); + connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) " + + "values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, " + + "INTERVAL '0' DAY, INTERVAL '0' SECOND)"); + connection.commit(); + + // Verify record generated during streaming + List streamingRecords = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL"); + assertThat(streamingRecords.size()).isEqualTo(2); + + streamingRecords.forEach(rec -> { + Struct after = ((Struct) rec.value()).getStruct(AFTER); + Struct key = (Struct) rec.key(); + switch ((int) key.get("ID")) { + case 3: + assertThat(after.getString("INTYM")).isEqualTo("P2Y0M0DT0H0M0S"); + assertThat(after.getString("INTYM2")).isEqualTo("P555Y4M0DT0H0M0S"); + assertThat(after.getString("INTDS")).isEqualTo("P0Y0M3DT0H0M0S"); + assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M111DT10H9M563.444333S"); + break; + case 4: + assertThat(after.getString("INTYM")).isEqualTo("P0Y0M0DT0H0M0S"); + assertThat(after.getString("INTYM2")).isEqualTo("P0Y0M0DT0H0M0S"); + assertThat(after.getString("INTDS")).isEqualTo("P0Y0M0DT0H0M0S"); + assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M0DT0H0M0S"); + break; + default: + fail("unexpected id"); + } + }); + } + finally { + TestHelper.dropTable(connection, "debezium.interval"); + } + } + @Test @FixFor("DBZ-2624") public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() throws Exception { diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDefaultValueIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDefaultValueIT.java index e2dd209a9..658a3462d 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDefaultValueIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDefaultValueIT.java @@ -27,6 +27,7 @@ import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.junit.logging.LogInterceptor; +import io.debezium.time.Interval; import io.debezium.time.MicroDuration; import io.debezium.util.Testing; @@ -39,9 +40,12 @@ public class OracleDefaultValueIT extends AbstractConnectorTest { private OracleConnection connection; + private Consumer configUpdater; @Before public void before() throws Exception { + configUpdater = builder -> { + }; connection = TestHelper.testConnection(); setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); initializeConnectorTestFramework(); @@ -274,6 +278,28 @@ public void shouldHandleIntervalDefaultTypes() throws Exception { shouldHandleDefaultValuesCommon(columnDefinitions); } + @Test + @FixFor("DBZ-1539") + public void shouldHandleIntervalDefaultTypesAsString() throws Exception { + configUpdater = builder -> { + builder.with(OracleConnectorConfig.INTERVAL_HANDLING_MODE, + OracleConnectorConfig.IntervalHandlingMode.STRING.getValue()); + }; + List columnDefinitions = Arrays.asList( + new ColumnDefinition("val_int_ytm", "interval year to month", + "'5-3'", "'7-4'", + getOracleIntervalYearMonthString(5, 3), + getOracleIntervalYearMonthString(7, 4), + AssertionType.FIELD_DEFAULT_EQUAL), + new ColumnDefinition("val_int_dts", "interval day(3) to second(3)", + "'5 1:2:3.456'", "'3 2:1:4.567'", + getOracleIntervalDaySecondString(5, 1, 2, 3, 456000), + getOracleIntervalDaySecondString(3, 2, 1, 4, 567000), + AssertionType.FIELD_DEFAULT_EQUAL)); + + shouldHandleDefaultValuesCommon(columnDefinitions); + } + @Test @FixFor("DBZ-4208") public void shouldHandleDefaultValueFromSequencesAsNoDefault() throws Exception { @@ -294,10 +320,19 @@ private long getOracleIntervalYearMonth(int years, int month) { return MicroDuration.durationMicros(years, month, 0, 0, 0, 0, MicroDuration.DAYS_PER_MONTH_AVG); } + private String getOracleIntervalYearMonthString(int years, int month) { + return Interval.toIsoString(years, month, 0, 0, 0, BigDecimal.ZERO); + } + private long getOracleIntervalDaySecond(int days, int hours, int minutes, int seconds, int micros) { return MicroDuration.durationMicros(0, 0, days, hours, minutes, seconds, micros, MicroDuration.DAYS_PER_MONTH_AVG); } + private String getOracleIntervalDaySecondString(int days, int hours, int minutes, int seconds, int micros) { + double secondsDouble = (double) seconds + (double) micros / 1_000_000D; + return Interval.toIsoString(0, 0, days, hours, minutes, BigDecimal.valueOf(secondsDouble)); + } + /** * Handles executing the full common set of default value tests for the supplied column definitions. * @@ -356,6 +391,7 @@ private void testDefaultValuesCreateTableAndSnapshot(List colu Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DEFAULT_VALUE_TEST") + .apply(configUpdater) .build(); // Start connector diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index 86a19aa39..74c1e50f0 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -1258,13 +1258,21 @@ Represents the number of milliseconds past epoch, and does not include timezone |`FLOAT64` |`io.debezium.time.MicroDuration` + + -The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. + + + +`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) + + + +The string representation of the interval value that follows the pattern `PYMDTHMS`, for example, `P1Y2M3DT4H5M6.78S`. |`INTERVAL YEAR[(M)] TO MONTH` |`FLOAT64` |`io.debezium.time.MicroDuration` + + -The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. + + + +`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) + + + +The string representation of the interval value that follows the pattern `PYMDTHMS`, for example, `P1Y2M3DT4H5M6.78S`. |`TIMESTAMP(0 - 3)` |`INT64` @@ -1316,13 +1324,21 @@ Represents the number of days since the epoch. |`FLOAT64` |`io.debezium.time.MicroDuration` + + -The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. + + + +`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) + + + +The string representation of the interval value that follows the pattern `PYMDTHMS`, for example, `P1Y2M3DT4H5M6.78S`. |`INTERVAL YEAR[(M)] TO MONTH` |`FLOAT64` |`io.debezium.time.MicroDuration` + + -The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. + + + +`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) + + + +The string representation of the interval value that follows the pattern `PYMDTHMS`, for example, `P1Y2M3DT4H5M6.78S`. |`TIMESTAMP(0 - 3)` |`INT64` @@ -2255,6 +2271,14 @@ You can set one of the following options: Using the `string` option is easier to consume, but results in a loss of semantic information about the real type. For more information, see <>. +|[[oracle-property-interval-handling-mode]]<> +|`numeric` +| Specifies how the connector should handle values for `interval` columns: + + + +`numeric` represents intervals using approximate number of microseconds. + + + +`string` represents intervals exactly by using the string pattern representation `PYMDTHMS`. For example: `P1Y2M3DT4H5M6.78S`. + |[[oracle-property-event-processing-failure-handling-mode]]<> |`fail` | Specifies how the connector should react to exceptions during processing of events. diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index ca1d6944d..72bb0ce02 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -104,3 +104,4 @@ lujiefsi,陆杰 ahodavdekar,Abhishek Hodavdekar josetesan,Jose Luis yw,Yang Wu +TomBillietKlarrio,Tom Billiet