diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java index a1e73ba97..6a32f5818 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java @@ -5,9 +5,11 @@ */ package io.debezium.connector.oracle; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -86,7 +88,7 @@ protected void emitUpdateAsPrimaryKeyChangeRecord(Receiver receiver, TableSchema connection.setSessionToPdb(connectorConfig.getPdbName()); } connection.prepareQuery(query, - ps -> prepareReselectQueryStatement(ps, table, newKey), + ps -> prepareReselectQueryStatement(ps, table, newColumnValues), rs -> updateNewValuesFromReselectQueryResults(rs, reselectColumns)); // newColumnValues have been updated via re-select, re-create the event's value @@ -165,12 +167,14 @@ private String getReselectQuery(List reselectColumns, Table table, Oracl * * @param ps the prepared statement * @param table the relational model table - * @param newKey the row's new key + * @param rawValues the adapter provided original values * @throws SQLException if a database error occurred */ - private void prepareReselectQueryStatement(PreparedStatement ps, Table table, Struct newKey) throws SQLException { - for (int i = 0; i < table.primaryKeyColumnNames().size(); ++i) { - ps.setObject(i + 1, newKey.get(table.primaryKeyColumnNames().get(i))); + private void prepareReselectQueryStatement(PreparedStatement ps, Table table, Object[] rawValues) throws SQLException { + final List primaryKeyColumnNames = table.primaryKeyColumnNames(); + for (int i = 0; i < primaryKeyColumnNames.size(); i++) { + final Column column = table.columnWithName(primaryKeyColumnNames.get(i)); + ps.setObject(i + 1, convertReselectPrimaryKeyColumn(ps.getConnection(), column, rawValues[column.position() - 1])); } } @@ -189,4 +193,30 @@ private void updateNewValuesFromReselectQueryResults(ResultSet rs, List } } } + + /** + * Converts the reselect query's primary key column value, if applicable. + * + * @param connection the underlying jdbc connection, should not be {@code null} + * @param column the column, should not be {@code null} + * @param value the value to be converted, may be {@code null} + * @return the converted value to be directly bound to the reselect query + */ + protected Object convertReselectPrimaryKeyColumn(Connection connection, Column column, Object value) { + return value; + } + + protected Object convertValueViaQuery(Connection connection, String value) { + try (Statement statement = connection.createStatement()) { + try (ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM DUAL", value))) { + if (!rs.next()) { + throw new DebeziumException("Expected query to return a value but did not."); + } + return rs.getObject(1); + } + } + catch (SQLException e) { + throw new DebeziumException(String.format("Failed to execute reselect query for value '%s'.", value), e); + } + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index 7e8bcdca6..e86b88897 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -15,14 +15,12 @@ import java.sql.SQLException; import java.sql.Types; import java.time.Instant; -import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; -import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -32,6 +30,7 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.connector.oracle.logminer.UnistrHelper; +import io.debezium.connector.oracle.util.TimestampUtils; import io.debezium.data.SpecialValueDecimal; import io.debezium.data.VariableScaleDecimal; import io.debezium.jdbc.JdbcValueConverters; @@ -72,27 +71,6 @@ public class OracleValueConverters extends JdbcValueConverters { private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)"); - private static final ZoneId GMT_ZONE_ID = ZoneId.of("GMT"); - - private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .optionalStart() - .appendPattern(".") - .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) - .optionalEnd() - .toFormatter(); - - private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .appendPattern("dd-MMM-yy hh.mm.ss") - .optionalStart() - .appendPattern(".") - .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) - .optionalEnd() - .appendPattern(" a") - .toFormatter(Locale.ENGLISH); - private static final DateTimeFormatter TIMESTAMP_TZ_FORMATTER = new DateTimeFormatterBuilder() .parseCaseInsensitive() .appendPattern("yyyy-MM-dd HH:mm:ss") @@ -106,9 +84,7 @@ public class OracleValueConverters extends JdbcValueConverters { .appendOffset("+HH:MM", "") .toFormatter(); - private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); - private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE); private static final BigDecimal MICROSECONDS_PER_SECOND = new BigDecimal(1_000_000); private final OracleConnection connection; @@ -669,28 +645,7 @@ private Instant resolveTimestampStringAsInstant(String data) { if (isHexToRawFunctionCall(data)) { return convertHexToRawFunctionToTimestamp(data).toInstant(); } - - LocalDateTime dateTime; - final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data); - if (toTimestampMatcher.matches()) { - String dateText = toTimestampMatcher.group(1); - if (dateText.indexOf(" AM") > 0 || dateText.indexOf(" PM") > 0) { - dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(dateText.trim())); - } - else { - dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(dateText.trim())); - } - return dateTime.atZone(GMT_ZONE_ID).toInstant(); - } - - final Matcher toDateMatcher = TO_DATE.matcher(data); - if (toDateMatcher.matches()) { - dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1))); - return dateTime.atZone(GMT_ZONE_ID).toInstant(); - } - - // Unable to resolve - return null; + return TimestampUtils.convertTimestampNoZoneToInstant(data); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java index 4a1ce878d..169b6a60a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java @@ -5,22 +5,34 @@ */ package io.debezium.connector.oracle.logminer; +import java.sql.Connection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.DebeziumException; import io.debezium.connector.oracle.BaseChangeRecordEmitter; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.util.TimestampUtils; import io.debezium.data.Envelope.Operation; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; +import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.util.Clock; +import io.debezium.util.Strings; + +import oracle.jdbc.OracleTypes; /** * Emits change records based on an event read from Oracle LogMiner. */ public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter { + private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerChangeRecordEmitter.class); + private final Operation operation; public LogMinerChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partition partition, OffsetContext offset, @@ -55,4 +67,29 @@ private static Operation getOperation(EventType eventType) { public Operation getOperation() { return operation; } + + @Override + protected Object convertReselectPrimaryKeyColumn(Connection connection, Column column, Object value) { + if (value instanceof String) { + // LogMiner raw values are always string; otherwise generally null + switch (column.jdbcType()) { + case OracleTypes.TIMESTAMP: + case OracleTypes.DATE: + final String formattedTimestamp = TimestampUtils.toSqlCompliantFunctionCall((String) value); + if (!Strings.isNullOrBlank(formattedTimestamp)) { + value = convertValueViaQuery(connection, formattedTimestamp); + } + break; + case OracleTypes.INTERVALYM: + case OracleTypes.INTERVALDS: + // LogMiner provides these values in SQL-compliant query fragments + value = convertValueViaQuery(connection, (String) value); + break; + default: + // no -op + break; + } + } + return value; + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorChangeRecordEmitter.java index 86a3d5ea4..afc7e5187 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorChangeRecordEmitter.java @@ -5,15 +5,20 @@ */ package io.debezium.connector.oracle.olr; +import java.sql.Connection; + import io.debezium.connector.oracle.BaseChangeRecordEmitter; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OraclePartition; import io.debezium.data.Envelope.Operation; +import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.util.Clock; +import oracle.jdbc.internal.OracleTypes; + /** * A change record emitter for the OpenLogReplicator streaming adapter. * @@ -21,6 +26,10 @@ */ public class OpenLogReplicatorChangeRecordEmitter extends BaseChangeRecordEmitter { + private static final String EPOCH_NANO = "TIMESTAMP'1970-01-01 00:00:00' + NUMTODSINTERVAL(%s/1000000000,'SECOND')"; + private static final String TO_DSINTERVAL = "TO_DSINTERVAL('%s')"; + private static final String TO_YMINTERVAL = "TO_YMINTERVAL('%s')"; + private final Operation operation; public OpenLogReplicatorChangeRecordEmitter(OracleConnectorConfig connectorConfig, OraclePartition partition, @@ -36,4 +45,28 @@ public Operation getOperation() { return operation; } + @Override + protected Object convertReselectPrimaryKeyColumn(Connection connection, Column column, Object value) { + switch (column.jdbcType()) { + case OracleTypes.TIMESTAMP: + case OracleTypes.DATE: + if (value instanceof Number) { + // OpenLogReplicator should be configured to provide values in nanoseconds precision. + value = convertValueViaQuery(connection, String.format(EPOCH_NANO, value)); + } + break; + case OracleTypes.INTERVALDS: + if (value instanceof String) { + // OpenLogReplicator provides this as an TO_DSINTERVAL constructor argument string. + value = convertValueViaQuery(connection, String.format(TO_DSINTERVAL, ((String) value).replace(",", " "))); + } + break; + case OracleTypes.INTERVALYM: + if (value instanceof String) { + value = convertValueViaQuery(connection, String.format(TO_YMINTERVAL, value)); + } + break; + } + return value; + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/util/TimestampUtils.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/util/TimestampUtils.java new file mode 100644 index 000000000..40dff9c64 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/util/TimestampUtils.java @@ -0,0 +1,102 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.util; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author Chris Cranford + */ +public final class TimestampUtils { + + private static final ZoneId GMT_ZONE_ID = ZoneId.of("GMT"); + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .optionalStart() + .appendPattern(".") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) + .optionalEnd() + .toFormatter(); + + private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .appendPattern("dd-MMM-yy hh.mm.ss") + .optionalStart() + .appendPattern(".") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) + .optionalEnd() + .appendPattern(" a") + .toFormatter(Locale.ENGLISH); + + private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); + private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE); + + /** + * Convert the supplied timestamp without a timezone. + * + * @param value the string-value to be converted + * @return the returned converted value or {@code null} if the value could not be converted + */ + public static Instant convertTimestampNoZoneToInstant(String value) { + final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(value); + if (toTimestampMatcher.matches()) { + final LocalDateTime dateTime; + String text = toTimestampMatcher.group(1); + if (text.indexOf(" AM") > 0 || text.indexOf(" PM") > 0) { + dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(text.trim())); + } + else { + dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(text.trim())); + } + return dateTime.atZone(GMT_ZONE_ID).toInstant(); + } + + final Matcher toDateMatcher = TO_DATE.matcher(value); + if (toDateMatcher.matches()) { + return LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1))).atZone(GMT_ZONE_ID).toInstant(); + } + + // Unable to resolve value + return null; + } + + /** + * Converts the supplied string-value into a SQL compliant {@code TO_TIMESTAMP} string. + * + * @param value the string-value to be converted + * @return the {@code TO_TIMESTAMP} function call + */ + public static String toSqlCompliantFunctionCall(String value) { + final Matcher timestampMatcher = TO_TIMESTAMP.matcher(value); + if (timestampMatcher.matches()) { + String text = timestampMatcher.group(1); + if (text.indexOf(" AM") > 0 || text.indexOf(" PM") > 0) { + return "TO_TIMESTAMP('" + text + "', 'YYYY-MM-DD HH24:MI:SS.FF A')"; + } + return "TO_TIMESTAMP('" + text + "', 'YYYY-MM-DD HH24:MI:SS.FF')"; + } + + final Matcher dateMatcher = TO_DATE.matcher(value); + if (dateMatcher.matches()) { + // TO_DATE is already properly formatted. + return value; + } + return null; + } + + private TimestampUtils() { + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OraclePrimaryKeyLobReselectIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OraclePrimaryKeyLobReselectIT.java new file mode 100644 index 000000000..23e77734b --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OraclePrimaryKeyLobReselectIT.java @@ -0,0 +1,200 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Clob; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; +import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.data.Envelope; +import io.debezium.data.VerifyRecord; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; + +/** + * Integration tests when LOB is enabled and the primary key changes, to re-select LOB columns + * within the {@link BaseChangeRecordEmitter}. + * + * @author Chris Cranford + */ +public class OraclePrimaryKeyLobReselectIT extends AbstractConnectorTest { + + @Rule + public TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule(); + + private OracleConnection connection; + + @Before + public void beforeEach() { + connection = TestHelper.testConnection(); + setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); + initializeConnectorTestFramework(); + Files.delete(TestHelper.SCHEMA_HISTORY_PATH); + } + + @After + public void afterEach() throws Exception { + super.stopConnector(); + + if (connection != null) { + connection.close(); + } + } + + @Test + @FixFor("DBZ-7458") + public void testCharColumn() throws Exception { + testPrimaryKeyChangeReselect("char(5)", "'no'"); + } + + @Test + @FixFor("DBZ-7458") + public void testNationalizedCharColumn() throws Exception { + testPrimaryKeyChangeReselect("nchar(5)", "'no'"); + } + + @Test + @FixFor("DBZ-7458") + public void testVarchar2Column() throws Exception { + testPrimaryKeyChangeReselect("varchar2(50)", "'insert'"); + } + + @Test + @FixFor("DBZ-7458") + public void testNationalizedVarchar2Column() throws Exception { + testPrimaryKeyChangeReselect("nvarchar2(50)", "'insert'"); + } + + @Test + @FixFor("DBZ-7458") + public void testNumericColumnNotVariableScaleDecimal() throws Exception { + testPrimaryKeyChangeReselect("numeric(18,0)", "25"); + } + + @Test + @FixFor("DBZ-7458") + public void testNumeric38Column() throws Exception { + testPrimaryKeyChangeReselect("numeric(38,0)", "25"); + } + + @Test + @FixFor("DBZ-7458") + public void testFloatColumn() throws Exception { + testPrimaryKeyChangeReselect("float(38)", "25"); + } + + @Test + @FixFor("DBZ-7458") + @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "BINARY_FLOAT not supported") + public void testBinaryFloatColumn() throws Exception { + testPrimaryKeyChangeReselect("binary_float", "25"); + } + + @Test + @FixFor("DBZ-7458") + @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "BINARY_DOUBLE not supported") + public void testBinaryDoubleColumn() throws Exception { + testPrimaryKeyChangeReselect("binary_double", "25"); + } + + @Test + @FixFor("DBZ-7458") + public void testDateColumn() throws Exception { + testPrimaryKeyChangeReselect("date", "sysdate"); + } + + @Test + @FixFor("DBZ-7458") + public void testTimestampColumn() throws Exception { + testPrimaryKeyChangeReselect("timestamp", "current_timestamp"); + } + + @Test + @FixFor("DBZ-7458") + public void testIntervalYearToMonthColumn() throws Exception { + testPrimaryKeyChangeReselect("interval year to month", "INTERVAL '-3-6' YEAR TO MONTH"); + } + + @Test + @FixFor("DBZ-7458") + public void testIntervalDayToSecondColumn() throws Exception { + testPrimaryKeyChangeReselect("interval day(3) to second(2)", "INTERVAL '-1 2:3:4.56' DAY TO SECOND"); + } + + /** + * Types the primary-key change reselection process with a specific key column type and inserted value. + * + * Internally the method uses a composite key and the numeric {@code id} field is what is mutated, which + * triggers the LOB-based emitter reselection. + * + * @param keyType the key column type + * @param keyValue the key value to insert + */ + private void testPrimaryKeyChangeReselect(String keyType, String keyValue) throws Exception { + TestHelper.dropTable(connection, "dbz7458"); + try { + connection.execute(String.format( + "CREATE TABLE dbz7458 (id numeric(9,0), other_id %s, data clob, primary key(id, other_id))", + keyType)); + + TestHelper.streamTable(connection, "dbz7458"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.LOB_ENABLED, "true") + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ7458") + .build(); + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Insert test row + final String data = RandomStringUtils.randomAlphanumeric(16384); + final Clob clob = connection.connection().createClob(); + clob.setString(1, data); + connection.prepareQuery(String.format("INSERT INTO dbz7458 (id,other_id,data) values (1,%s,?)", keyValue), + ps -> ps.setClob(1, clob), + null); + connection.commit(); + + // Update row without changing LOB + connection.execute("UPDATE dbz7458 SET id = 2 WHERE id = 1"); + + // There will be the original insert, the delete, tombstone, and the insert + // The last three are based on the primary-key update logic in BaseChangeRecordEmitter + final SourceRecords sourceRecords = consumeRecordsByTopic(4); + + final List tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ7458"); + assertThat(tableRecords).hasSize(4); + + // Fetch the record that will contain the reselected values for the insert + final SourceRecord insert = tableRecords.get(3); + VerifyRecord.isValidInsert(insert, "ID", 2); + + // Verify that the LOB column was re-selected + final Struct after = ((Struct) insert.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("DATA")).isEqualTo(data); + } + finally { + TestHelper.dropTable(connection, "dbz7458"); + } + } +}