DBZ-1539 Add interval.handling.mode to oracle connector

This commit is contained in:
Tom Billiet 2021-11-24 11:45:56 +01:00 committed by Gunnar Morling
parent 2157256398
commit 46b47a42b2
7 changed files with 407 additions and 18 deletions

View File

@ -317,6 +317,7 @@ Thomas Prelle
Thomas Thornton Thomas Thornton
Tin Nguyen Tin Nguyen
Tom Bentley Tom Bentley
Tom Billiet
Tomaz Lemos Fernandes Tomaz Lemos Fernandes
Tommy Karlsson Tommy Karlsson
Tony Rizko Tony Rizko

View File

@ -93,6 +93,16 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withValidation(OracleConnectorConfig::validateOutServerName) .withValidation(OracleConnectorConfig::validateOutServerName)
.withDescription("Name of the XStream Out server to connect to."); .withDescription("Name of the XStream Out server to connect to.");
public static final Field INTERVAL_HANDLING_MODE = Field.create("interval.handling.mode")
.withDisplayName("Interval Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21))
.withEnum(IntervalHandlingMode.class, IntervalHandlingMode.NUMERIC)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Specify how INTERVAL columns should be represented in change events, including:"
+ "'string' represents values as an exact ISO formatted string"
+ "'numeric' (default) represents values using the inexact conversion into microseconds");
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode") .withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL) .withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
@ -413,6 +423,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
SNAPSHOT_ENHANCEMENT_TOKEN, SNAPSHOT_ENHANCEMENT_TOKEN,
SNAPSHOT_LOCKING_MODE, SNAPSHOT_LOCKING_MODE,
RAC_NODES, RAC_NODES,
INTERVAL_HANDLING_MODE,
LOG_MINING_ARCHIVE_LOG_HOURS, LOG_MINING_ARCHIVE_LOG_HOURS,
LOG_MINING_BATCH_SIZE_DEFAULT, LOG_MINING_BATCH_SIZE_DEFAULT,
LOG_MINING_BATCH_SIZE_MIN, LOG_MINING_BATCH_SIZE_MIN,
@ -457,6 +468,7 @@ public static ConfigDef configDef() {
private final String databaseName; private final String databaseName;
private final String pdbName; private final String pdbName;
private final String xoutServerName; private final String xoutServerName;
private final IntervalHandlingMode intervalHandlingMode;
private final SnapshotMode snapshotMode; private final SnapshotMode snapshotMode;
private final String oracleVersion; private final String oracleVersion;
@ -496,6 +508,7 @@ public OracleConnectorConfig(Configuration config) {
this.databaseName = toUpperCase(config.getString(DATABASE_NAME)); this.databaseName = toUpperCase(config.getString(DATABASE_NAME));
this.pdbName = toUpperCase(config.getString(PDB_NAME)); this.pdbName = toUpperCase(config.getString(PDB_NAME));
this.xoutServerName = config.getString(XSTREAM_SERVER_NAME); this.xoutServerName = config.getString(XSTREAM_SERVER_NAME);
this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(INTERVAL_HANDLING_MODE));
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.oracleVersion = config.getString(ORACLE_VERSION); this.oracleVersion = config.getString(ORACLE_VERSION);
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN); this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
@ -552,6 +565,10 @@ public String getXoutServerName() {
return xoutServerName; return xoutServerName;
} }
public IntervalHandlingMode getIntervalHandlingMode() {
return intervalHandlingMode;
}
public SnapshotMode getSnapshotMode() { public SnapshotMode getSnapshotMode() {
return snapshotMode; return snapshotMode;
} }
@ -569,6 +586,67 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return getAdapter().getHistoryRecordComparator(); return getAdapter().getHistoryRecordComparator();
} }
/**
* Defines modes of representation of {@code interval} datatype
*/
public enum IntervalHandlingMode implements EnumeratedValue {
/**
* Represents interval as inexact microseconds count
*/
NUMERIC("numeric"),
/**
* Represents interval as ISO 8601 time interval
*/
STRING("string");
private final String value;
IntervalHandlingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Convert mode name into the logical value
*
* @param value the configuration property value ; may not be null
* @return the matching option, or null if the match is not found
*/
public static IntervalHandlingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (IntervalHandlingMode option : IntervalHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Convert mode name into the logical value
*
* @param value the configuration property value ; may not be null
* @param defaultValue the default value ; may be null
* @return the matching option or null if the match is not found and non-null default is invalid
*/
public static IntervalHandlingMode parse(String value, String defaultValue) {
IntervalHandlingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
/** /**
* The set of predefined SnapshotMode options or aliases. * The set of predefined SnapshotMode options or aliases.
*/ */

View File

@ -37,6 +37,7 @@
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter; import io.debezium.relational.ValueConverter;
import io.debezium.time.Date; import io.debezium.time.Date;
import io.debezium.time.Interval;
import io.debezium.time.MicroDuration; import io.debezium.time.MicroDuration;
import io.debezium.time.ZonedTimestamp; import io.debezium.time.ZonedTimestamp;
import io.debezium.util.NumberConversions; import io.debezium.util.NumberConversions;
@ -106,9 +107,11 @@ public class OracleValueConverters extends JdbcValueConverters {
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final BigDecimal MICROSECONDS_PER_SECOND = new BigDecimal(1_000_000);
private final OracleConnection connection; private final OracleConnection connection;
private final boolean lobEnabled; private final boolean lobEnabled;
private final OracleConnectorConfig.IntervalHandlingMode intervalHandlingMode;
private final byte[] unavailableValuePlaceholderBinary; private final byte[] unavailableValuePlaceholderBinary;
private final String unavailableValuePlaceholderString; private final String unavailableValuePlaceholderString;
@ -116,6 +119,7 @@ public OracleValueConverters(OracleConnectorConfig config, OracleConnection conn
super(config.getDecimalMode(), config.getTemporalPrecisionMode(), ZoneOffset.UTC, null, null, null); super(config.getDecimalMode(), config.getTemporalPrecisionMode(), ZoneOffset.UTC, null, null, null);
this.connection = connection; this.connection = connection;
this.lobEnabled = config.isLobEnabled(); this.lobEnabled = config.isLobEnabled();
this.intervalHandlingMode = config.getIntervalHandlingMode();
this.unavailableValuePlaceholderBinary = config.getUnavailableValuePlaceholder(); this.unavailableValuePlaceholderBinary = config.getUnavailableValuePlaceholder();
this.unavailableValuePlaceholderString = new String(config.getUnavailableValuePlaceholder()); this.unavailableValuePlaceholderString = new String(config.getUnavailableValuePlaceholder());
} }
@ -144,7 +148,7 @@ public SchemaBuilder schemaBuilder(Column column) {
return ZonedTimestamp.builder(); return ZonedTimestamp.builder();
case OracleTypes.INTERVALYM: case OracleTypes.INTERVALYM:
case OracleTypes.INTERVALDS: case OracleTypes.INTERVALDS:
return MicroDuration.builder(); return intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING ? Interval.builder() : MicroDuration.builder();
case Types.STRUCT: case Types.STRUCT:
return SchemaBuilder.string(); return SchemaBuilder.string();
default: { default: {
@ -651,7 +655,13 @@ protected Object convertIntervalYearMonth(Column column, Field fieldDefn, Object
return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> { return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> {
if (data instanceof Number) { if (data instanceof Number) {
// we expect to get back from the plugin a double value // we expect to get back from the plugin a double value
r.deliver(((Number) data).longValue()); final long micros = ((Number) data).longValue();
if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
r.deliver(Interval.toIsoString(0, 0, 0, 0, 0, new BigDecimal(micros).divide(MICROSECONDS_PER_SECOND)));
}
else {
r.deliver(micros);
}
} }
else if (data instanceof INTERVALYM) { else if (data instanceof INTERVALYM) {
convertOracleIntervalYearMonth(data, r); convertOracleIntervalYearMonth(data, r);
@ -677,8 +687,13 @@ private void convertOracleIntervalYearMonth(Object data, ResultReceiver r) {
if (interval.charAt(i) == '-') { if (interval.charAt(i) == '-') {
final int year = sign * Integer.parseInt(interval.substring(start, i)); final int year = sign * Integer.parseInt(interval.substring(start, i));
final int month = sign * Integer.parseInt(interval.substring(i + 1, interval.length())); final int month = sign * Integer.parseInt(interval.substring(i + 1, interval.length()));
r.deliver(MicroDuration.durationMicros(year, month, 0, 0, if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
0, 0, MicroDuration.DAYS_PER_MONTH_AVG)); r.deliver(Interval.toIsoString(year, month, 0, 0, 0, BigDecimal.ZERO));
}
else {
r.deliver(MicroDuration.durationMicros(year, month, 0, 0,
0, 0, MicroDuration.DAYS_PER_MONTH_AVG));
}
} }
} }
} }
@ -687,7 +702,13 @@ protected Object convertIntervalDaySecond(Column column, Field fieldDefn, Object
return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> { return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> {
if (data instanceof Number) { if (data instanceof Number) {
// we expect to get back from the plugin a double value // we expect to get back from the plugin a double value
r.deliver(((Number) data).longValue()); final long micros = ((Number) data).longValue();
if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
r.deliver(Interval.toIsoString(0, 0, 0, 0, 0, new BigDecimal(micros).divide(MICROSECONDS_PER_SECOND)));
}
else {
r.deliver(micros);
}
} }
else if (data instanceof INTERVALDS) { else if (data instanceof INTERVALDS) {
convertOracleIntervalDaySecond(data, r); convertOracleIntervalDaySecond(data, r);
@ -706,15 +727,28 @@ private void convertOracleIntervalDaySecond(Object data, ResultReceiver r) {
final Matcher m = INTERVAL_DAY_SECOND_PATTERN.matcher(interval); final Matcher m = INTERVAL_DAY_SECOND_PATTERN.matcher(interval);
if (m.matches()) { if (m.matches()) {
final int sign = "-".equals(m.group(1)) ? -1 : 1; final int sign = "-".equals(m.group(1)) ? -1 : 1;
r.deliver(MicroDuration.durationMicros( if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
0, double seconds = (double) (sign * Integer.parseInt(m.group(5)))
0, + (double) Integer.parseInt(Strings.pad(m.group(6), 6, '0')) / 1_000_000D;
sign * Integer.valueOf(m.group(2)), r.deliver(Interval.toIsoString(
sign * Integer.valueOf(m.group(3)), 0,
sign * Integer.valueOf(m.group(4)), 0,
sign * Integer.valueOf(m.group(5)), sign * Integer.valueOf(m.group(2)),
sign * Integer.valueOf(Strings.pad(m.group(6), 6, '0')), sign * Integer.valueOf(m.group(3)),
MicroDuration.DAYS_PER_MONTH_AVG)); sign * Integer.valueOf(m.group(4)),
BigDecimal.valueOf(seconds)));
}
else {
r.deliver(MicroDuration.durationMicros(
0,
0,
sign * Integer.valueOf(m.group(2)),
sign * Integer.valueOf(m.group(3)),
sign * Integer.valueOf(m.group(4)),
sign * Integer.valueOf(m.group(5)),
sign * Integer.valueOf(Strings.pad(m.group(6), 6, '0')),
MicroDuration.DAYS_PER_MONTH_AVG));
}
} }
} }

View File

@ -11,6 +11,7 @@
import static io.debezium.connector.oracle.util.TestHelper.defaultConfig; import static io.debezium.connector.oracle.util.TestHelper.defaultConfig;
import static io.debezium.data.Envelope.FieldName.AFTER; import static io.debezium.data.Envelope.FieldName.AFTER;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.fail;
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry; import static org.fest.assertions.MapAssert.entry;
@ -947,6 +948,220 @@ public void shouldPropagateSourceTypeByDatatype() throws Exception {
entry(TYPE_LENGTH_PARAMETER_KEY, "10")); entry(TYPE_LENGTH_PARAMETER_KEY, "10"));
} }
@Test
@FixFor("DBZ-1539")
public void shouldHandlerIntervalTypesAsInt64() throws Exception {
// Drop table if it exists
TestHelper.dropTable(connection, "debezium.interval");
try {
// complex ddl
final String ddl = "create table debezium.interval (" +
" id numeric(6) constraint interval_id_nn not null, " +
" intYM interval year to month," +
" intYM2 interval year(9) to month," + // max precision
" intDS interval day to second, " +
" intDS2 interval day(9) to second(9), " + // max precision
" constraint interval_pk primary key(id)" +
")";
// create table
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.interval to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.interval ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
// Insert a snapshot record
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, "
+ "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))");
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, "
+ "INTERVAL '0' DAY, INTERVAL '0' SECOND)");
connection.commit();
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL")
.with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")
.build();
// Perform a basic startup & initial snapshot of data
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Verify record generated during snapshot
final List<SourceRecord> records = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL");
assertThat(records.size()).isEqualTo(2);
records.forEach(rec -> {
Struct after = ((Struct) rec.value()).getStruct(AFTER);
Struct key = (Struct) rec.key();
switch ((int) key.get("ID")) {
case 1:
assertThat(after.getInt64("INTYM")).isEqualTo(63115200000000L);
assertThat(after.getInt64("INTYM2")).isEqualTo(17524987200000000L);
assertThat(after.getInt64("INTDS")).isEqualTo(259200000000L);
assertThat(after.getInt64("INTDS2")).isEqualTo(9627503444333L);
break;
case 2:
assertThat(after.getInt64("INTYM")).isEqualTo(0L);
assertThat(after.getInt64("INTYM2")).isEqualTo(0L);
assertThat(after.getInt64("INTDS")).isEqualTo(0L);
assertThat(after.getInt64("INTDS2")).isEqualTo(0L);
break;
default:
fail("unexpected id");
}
});
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, "
+ "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))");
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, "
+ "INTERVAL '0' DAY, INTERVAL '0' SECOND)");
connection.commit();
// Verify record generated during streaming
List<SourceRecord> streamingRecords = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL");
assertThat(streamingRecords.size()).isEqualTo(2);
streamingRecords.forEach(rec -> {
Struct after = ((Struct) rec.value()).getStruct(AFTER);
Struct key = (Struct) rec.key();
switch ((int) key.get("ID")) {
case 3:
assertThat(after.getInt64("INTYM")).isEqualTo(63115200000000L);
assertThat(after.getInt64("INTYM2")).isEqualTo(17524987200000000L);
assertThat(after.getInt64("INTDS")).isEqualTo(259200000000L);
assertThat(after.getInt64("INTDS2")).isEqualTo(9627503444333L);
break;
case 4:
assertThat(after.getInt64("INTYM")).isEqualTo(0L);
assertThat(after.getInt64("INTYM2")).isEqualTo(0L);
assertThat(after.getInt64("INTDS")).isEqualTo(0L);
assertThat(after.getInt64("INTDS2")).isEqualTo(0L);
break;
default:
fail("unexpected id");
}
});
}
finally {
TestHelper.dropTable(connection, "debezium.interval");
}
}
@Test
@FixFor("DBZ-1539")
public void shouldHandlerIntervalTypesAsString() throws Exception {
// Drop table if it exists
TestHelper.dropTable(connection, "debezium.interval");
try {
// complex ddl
final String ddl = "create table debezium.interval (" +
" id numeric(6) constraint interval_id_nn not null, " +
" intYM interval year to month," +
" intYM2 interval year(9) to month," + // max precision
" intDS interval day to second, " +
" intDS2 interval day(9) to second(9), " + // max precision
" constraint interval_pk primary key(id)" +
")";
// create table
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.interval to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.interval ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
// Insert a snapshot record
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, "
+ "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))");
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, "
+ "INTERVAL '0' DAY, INTERVAL '0' SECOND)");
connection.commit();
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL")
.with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")
.with(OracleConnectorConfig.INTERVAL_HANDLING_MODE,
OracleConnectorConfig.IntervalHandlingMode.STRING.getValue())
.build();
// Perform a basic startup & initial snapshot of data
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Verify record generated during snapshot
final List<SourceRecord> records = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL");
assertThat(records.size()).isEqualTo(2);
records.forEach(rec -> {
Struct after = ((Struct) rec.value()).getStruct(AFTER);
Struct key = (Struct) rec.key();
switch ((int) key.get("ID")) {
case 1:
assertThat(after.getString("INTYM")).isEqualTo("P2Y0M0DT0H0M0S");
assertThat(after.getString("INTYM2")).isEqualTo("P555Y4M0DT0H0M0S");
assertThat(after.getString("INTDS")).isEqualTo("P0Y0M3DT0H0M0S");
assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M111DT10H9M563.444333S");
break;
case 2:
assertThat(after.getString("INTYM")).isEqualTo("P0Y0M0DT0H0M0S");
assertThat(after.getString("INTYM2")).isEqualTo("P0Y0M0DT0H0M0S");
assertThat(after.getString("INTDS")).isEqualTo("P0Y0M0DT0H0M0S");
assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M0DT0H0M0S");
break;
default:
fail("unexpected id");
}
});
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, "
+ "INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))");
connection.execute("INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) "
+ "values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, "
+ "INTERVAL '0' DAY, INTERVAL '0' SECOND)");
connection.commit();
// Verify record generated during streaming
List<SourceRecord> streamingRecords = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.INTERVAL");
assertThat(streamingRecords.size()).isEqualTo(2);
streamingRecords.forEach(rec -> {
Struct after = ((Struct) rec.value()).getStruct(AFTER);
Struct key = (Struct) rec.key();
switch ((int) key.get("ID")) {
case 3:
assertThat(after.getString("INTYM")).isEqualTo("P2Y0M0DT0H0M0S");
assertThat(after.getString("INTYM2")).isEqualTo("P555Y4M0DT0H0M0S");
assertThat(after.getString("INTDS")).isEqualTo("P0Y0M3DT0H0M0S");
assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M111DT10H9M563.444333S");
break;
case 4:
assertThat(after.getString("INTYM")).isEqualTo("P0Y0M0DT0H0M0S");
assertThat(after.getString("INTYM2")).isEqualTo("P0Y0M0DT0H0M0S");
assertThat(after.getString("INTDS")).isEqualTo("P0Y0M0DT0H0M0S");
assertThat(after.getString("INTDS2")).isEqualTo("P0Y0M0DT0H0M0S");
break;
default:
fail("unexpected id");
}
});
}
finally {
TestHelper.dropTable(connection, "debezium.interval");
}
}
@Test @Test
@FixFor("DBZ-2624") @FixFor("DBZ-2624")
public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() throws Exception { public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() throws Exception {

View File

@ -27,6 +27,7 @@
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor; import io.debezium.junit.logging.LogInterceptor;
import io.debezium.time.Interval;
import io.debezium.time.MicroDuration; import io.debezium.time.MicroDuration;
import io.debezium.util.Testing; import io.debezium.util.Testing;
@ -39,9 +40,12 @@
public class OracleDefaultValueIT extends AbstractConnectorTest { public class OracleDefaultValueIT extends AbstractConnectorTest {
private OracleConnection connection; private OracleConnection connection;
private Consumer<Configuration.Builder> configUpdater;
@Before @Before
public void before() throws Exception { public void before() throws Exception {
configUpdater = builder -> {
};
connection = TestHelper.testConnection(); connection = TestHelper.testConnection();
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework(); initializeConnectorTestFramework();
@ -274,6 +278,28 @@ public void shouldHandleIntervalDefaultTypes() throws Exception {
shouldHandleDefaultValuesCommon(columnDefinitions); shouldHandleDefaultValuesCommon(columnDefinitions);
} }
@Test
@FixFor("DBZ-1539")
public void shouldHandleIntervalDefaultTypesAsString() throws Exception {
configUpdater = builder -> {
builder.with(OracleConnectorConfig.INTERVAL_HANDLING_MODE,
OracleConnectorConfig.IntervalHandlingMode.STRING.getValue());
};
List<ColumnDefinition> columnDefinitions = Arrays.asList(
new ColumnDefinition("val_int_ytm", "interval year to month",
"'5-3'", "'7-4'",
getOracleIntervalYearMonthString(5, 3),
getOracleIntervalYearMonthString(7, 4),
AssertionType.FIELD_DEFAULT_EQUAL),
new ColumnDefinition("val_int_dts", "interval day(3) to second(3)",
"'5 1:2:3.456'", "'3 2:1:4.567'",
getOracleIntervalDaySecondString(5, 1, 2, 3, 456000),
getOracleIntervalDaySecondString(3, 2, 1, 4, 567000),
AssertionType.FIELD_DEFAULT_EQUAL));
shouldHandleDefaultValuesCommon(columnDefinitions);
}
@Test @Test
@FixFor("DBZ-4208") @FixFor("DBZ-4208")
public void shouldHandleDefaultValueFromSequencesAsNoDefault() throws Exception { public void shouldHandleDefaultValueFromSequencesAsNoDefault() throws Exception {
@ -294,10 +320,19 @@ private long getOracleIntervalYearMonth(int years, int month) {
return MicroDuration.durationMicros(years, month, 0, 0, 0, 0, MicroDuration.DAYS_PER_MONTH_AVG); return MicroDuration.durationMicros(years, month, 0, 0, 0, 0, MicroDuration.DAYS_PER_MONTH_AVG);
} }
private String getOracleIntervalYearMonthString(int years, int month) {
return Interval.toIsoString(years, month, 0, 0, 0, BigDecimal.ZERO);
}
private long getOracleIntervalDaySecond(int days, int hours, int minutes, int seconds, int micros) { private long getOracleIntervalDaySecond(int days, int hours, int minutes, int seconds, int micros) {
return MicroDuration.durationMicros(0, 0, days, hours, minutes, seconds, micros, MicroDuration.DAYS_PER_MONTH_AVG); return MicroDuration.durationMicros(0, 0, days, hours, minutes, seconds, micros, MicroDuration.DAYS_PER_MONTH_AVG);
} }
private String getOracleIntervalDaySecondString(int days, int hours, int minutes, int seconds, int micros) {
double secondsDouble = (double) seconds + (double) micros / 1_000_000D;
return Interval.toIsoString(0, 0, days, hours, minutes, BigDecimal.valueOf(secondsDouble));
}
/** /**
* Handles executing the full common set of default value tests for the supplied column definitions. * Handles executing the full common set of default value tests for the supplied column definitions.
* *
@ -356,6 +391,7 @@ private void testDefaultValuesCreateTableAndSnapshot(List<ColumnDefinition> colu
Configuration config = TestHelper.defaultConfig() Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DEFAULT_VALUE_TEST") .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DEFAULT_VALUE_TEST")
.apply(configUpdater)
.build(); .build();
// Start connector // Start connector

View File

@ -1258,13 +1258,21 @@ Represents the number of milliseconds past epoch, and does not include timezone
|`FLOAT64` |`FLOAT64`
|`io.debezium.time.MicroDuration` + |`io.debezium.time.MicroDuration` +
+ +
The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +
+
`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) +
+
The string representation of the interval value that follows the pattern `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, for example, `P1Y2M3DT4H5M6.78S`.
|`INTERVAL YEAR[(M)] TO MONTH` |`INTERVAL YEAR[(M)] TO MONTH`
|`FLOAT64` |`FLOAT64`
|`io.debezium.time.MicroDuration` + |`io.debezium.time.MicroDuration` +
+ +
The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +
+
`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) +
+
The string representation of the interval value that follows the pattern `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, for example, `P1Y2M3DT4H5M6.78S`.
|`TIMESTAMP(0 - 3)` |`TIMESTAMP(0 - 3)`
|`INT64` |`INT64`
@ -1316,13 +1324,21 @@ Represents the number of days since the epoch.
|`FLOAT64` |`FLOAT64`
|`io.debezium.time.MicroDuration` + |`io.debezium.time.MicroDuration` +
+ +
The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +
+
`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) +
+
The string representation of the interval value that follows the pattern `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, for example, `P1Y2M3DT4H5M6.78S`.
|`INTERVAL YEAR[(M)] TO MONTH` |`INTERVAL YEAR[(M)] TO MONTH`
|`FLOAT64` |`FLOAT64`
|`io.debezium.time.MicroDuration` + |`io.debezium.time.MicroDuration` +
+ +
The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. The number of micro seconds for a time interval using the `365.25 / 12.0` formula for days per month average. +
+
`io.debezium.time.Interval` (when `interval.handling.mode` is set to `string`) +
+
The string representation of the interval value that follows the pattern `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, for example, `P1Y2M3DT4H5M6.78S`.
|`TIMESTAMP(0 - 3)` |`TIMESTAMP(0 - 3)`
|`INT64` |`INT64`
@ -2255,6 +2271,14 @@ You can set one of the following options:
Using the `string` option is easier to consume, but results in a loss of semantic information about the real type. Using the `string` option is easier to consume, but results in a loss of semantic information about the real type.
For more information, see <<oracle-decimal-types>>. For more information, see <<oracle-decimal-types>>.
|[[oracle-property-interval-handling-mode]]<<oracle-property-interval-handling-mode, `+interval.handling.mode+`>>
|`numeric`
| Specifies how the connector should handle values for `interval` columns: +
+
`numeric` represents intervals using approximate number of microseconds. +
+
`string` represents intervals exactly by using the string pattern representation `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`. For example: `P1Y2M3DT4H5M6.78S`.
|[[oracle-property-event-processing-failure-handling-mode]]<<oracle-property-event-processing-failure-handling-mode, `+event.processing.failure.handling.mode+`>> |[[oracle-property-event-processing-failure-handling-mode]]<<oracle-property-event-processing-failure-handling-mode, `+event.processing.failure.handling.mode+`>>
|`fail` |`fail`
| Specifies how the connector should react to exceptions during processing of events. | Specifies how the connector should react to exceptions during processing of events.

View File

@ -104,3 +104,4 @@ lujiefsi,陆杰
ahodavdekar,Abhishek Hodavdekar ahodavdekar,Abhishek Hodavdekar
josetesan,Jose Luis josetesan,Jose Luis
yw,Yang Wu yw,Yang Wu
TomBillietKlarrio,Tom Billiet