DBZ-7458 Fix PK change LOB re-query with converted values

This commit is contained in:
Chris Cranford 2024-03-17 15:23:28 -04:00 committed by Chris Cranford
parent 3a66a45632
commit 97b5e335e2
6 changed files with 409 additions and 52 deletions

View File

@ -5,9 +5,11 @@
*/
package io.debezium.connector.oracle;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -86,7 +88,7 @@ protected void emitUpdateAsPrimaryKeyChangeRecord(Receiver receiver, TableSchema
connection.setSessionToPdb(connectorConfig.getPdbName());
}
connection.prepareQuery(query,
ps -> prepareReselectQueryStatement(ps, table, newKey),
ps -> prepareReselectQueryStatement(ps, table, newColumnValues),
rs -> updateNewValuesFromReselectQueryResults(rs, reselectColumns));
// newColumnValues have been updated via re-select, re-create the event's value
@ -165,12 +167,14 @@ private String getReselectQuery(List<Column> reselectColumns, Table table, Oracl
*
* @param ps the prepared statement
* @param table the relational model table
* @param newKey the row's new key
* @param rawValues the adapter provided original values
* @throws SQLException if a database error occurred
*/
private void prepareReselectQueryStatement(PreparedStatement ps, Table table, Struct newKey) throws SQLException {
for (int i = 0; i < table.primaryKeyColumnNames().size(); ++i) {
ps.setObject(i + 1, newKey.get(table.primaryKeyColumnNames().get(i)));
private void prepareReselectQueryStatement(PreparedStatement ps, Table table, Object[] rawValues) throws SQLException {
final List<String> primaryKeyColumnNames = table.primaryKeyColumnNames();
for (int i = 0; i < primaryKeyColumnNames.size(); i++) {
final Column column = table.columnWithName(primaryKeyColumnNames.get(i));
ps.setObject(i + 1, convertReselectPrimaryKeyColumn(ps.getConnection(), column, rawValues[column.position() - 1]));
}
}
@ -189,4 +193,30 @@ private void updateNewValuesFromReselectQueryResults(ResultSet rs, List<Column>
}
}
}
/**
* Converts the reselect query's primary key column value, if applicable.
*
* @param connection the underlying jdbc connection, should not be {@code null}
* @param column the column, should not be {@code null}
* @param value the value to be converted, may be {@code null}
* @return the converted value to be directly bound to the reselect query
*/
protected Object convertReselectPrimaryKeyColumn(Connection connection, Column column, Object value) {
return value;
}
protected Object convertValueViaQuery(Connection connection, String value) {
try (Statement statement = connection.createStatement()) {
try (ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM DUAL", value))) {
if (!rs.next()) {
throw new DebeziumException("Expected query to return a value but did not.");
}
return rs.getObject(1);
}
}
catch (SQLException e) {
throw new DebeziumException(String.format("Failed to execute reselect query for value '%s'.", value), e);
}
}
}

View File

@ -15,14 +15,12 @@
import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -32,6 +30,7 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.connector.oracle.logminer.UnistrHelper;
import io.debezium.connector.oracle.util.TimestampUtils;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
@ -72,27 +71,6 @@ public class OracleValueConverters extends JdbcValueConverters {
private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)");
private static final ZoneId GMT_ZONE_ID = ZoneId.of("GMT");
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.optionalStart()
.appendPattern(".")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
.optionalEnd()
.toFormatter();
private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("dd-MMM-yy hh.mm.ss")
.optionalStart()
.appendPattern(".")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
.optionalEnd()
.appendPattern(" a")
.toFormatter(Locale.ENGLISH);
private static final DateTimeFormatter TIMESTAMP_TZ_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MM-dd HH:mm:ss")
@ -106,9 +84,7 @@ public class OracleValueConverters extends JdbcValueConverters {
.appendOffset("+HH:MM", "")
.toFormatter();
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final BigDecimal MICROSECONDS_PER_SECOND = new BigDecimal(1_000_000);
private final OracleConnection connection;
@ -669,28 +645,7 @@ private Instant resolveTimestampStringAsInstant(String data) {
if (isHexToRawFunctionCall(data)) {
return convertHexToRawFunctionToTimestamp(data).toInstant();
}
LocalDateTime dateTime;
final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data);
if (toTimestampMatcher.matches()) {
String dateText = toTimestampMatcher.group(1);
if (dateText.indexOf(" AM") > 0 || dateText.indexOf(" PM") > 0) {
dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(dateText.trim()));
}
else {
dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(dateText.trim()));
}
return dateTime.atZone(GMT_ZONE_ID).toInstant();
}
final Matcher toDateMatcher = TO_DATE.matcher(data);
if (toDateMatcher.matches()) {
dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1)));
return dateTime.atZone(GMT_ZONE_ID).toInstant();
}
// Unable to resolve
return null;
return TimestampUtils.convertTimestampNoZoneToInstant(data);
}
@Override

View File

@ -5,22 +5,34 @@
*/
package io.debezium.connector.oracle.logminer;
import java.sql.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.util.TimestampUtils;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import oracle.jdbc.OracleTypes;
/**
* Emits change records based on an event read from Oracle LogMiner.
*/
public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerChangeRecordEmitter.class);
private final Operation operation;
public LogMinerChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partition partition, OffsetContext offset,
@ -55,4 +67,29 @@ private static Operation getOperation(EventType eventType) {
public Operation getOperation() {
return operation;
}
@Override
protected Object convertReselectPrimaryKeyColumn(Connection connection, Column column, Object value) {
if (value instanceof String) {
// LogMiner raw values are always string; otherwise generally null
switch (column.jdbcType()) {
case OracleTypes.TIMESTAMP:
case OracleTypes.DATE:
final String formattedTimestamp = TimestampUtils.toSqlCompliantFunctionCall((String) value);
if (!Strings.isNullOrBlank(formattedTimestamp)) {
value = convertValueViaQuery(connection, formattedTimestamp);
}
break;
case OracleTypes.INTERVALYM:
case OracleTypes.INTERVALDS:
// LogMiner provides these values in SQL-compliant query fragments
value = convertValueViaQuery(connection, (String) value);
break;
default:
// no -op
break;
}
}
return value;
}
}

View File

@ -5,15 +5,20 @@
*/
package io.debezium.connector.oracle.olr;
import java.sql.Connection;
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.data.Envelope.Operation;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.util.Clock;
import oracle.jdbc.internal.OracleTypes;
/**
* A change record emitter for the OpenLogReplicator streaming adapter.
*
@ -21,6 +26,10 @@
*/
public class OpenLogReplicatorChangeRecordEmitter extends BaseChangeRecordEmitter<Object> {
private static final String EPOCH_NANO = "TIMESTAMP'1970-01-01 00:00:00' + NUMTODSINTERVAL(%s/1000000000,'SECOND')";
private static final String TO_DSINTERVAL = "TO_DSINTERVAL('%s')";
private static final String TO_YMINTERVAL = "TO_YMINTERVAL('%s')";
private final Operation operation;
public OpenLogReplicatorChangeRecordEmitter(OracleConnectorConfig connectorConfig, OraclePartition partition,
@ -36,4 +45,28 @@ public Operation getOperation() {
return operation;
}
@Override
protected Object convertReselectPrimaryKeyColumn(Connection connection, Column column, Object value) {
switch (column.jdbcType()) {
case OracleTypes.TIMESTAMP:
case OracleTypes.DATE:
if (value instanceof Number) {
// OpenLogReplicator should be configured to provide values in nanoseconds precision.
value = convertValueViaQuery(connection, String.format(EPOCH_NANO, value));
}
break;
case OracleTypes.INTERVALDS:
if (value instanceof String) {
// OpenLogReplicator provides this as an TO_DSINTERVAL constructor argument string.
value = convertValueViaQuery(connection, String.format(TO_DSINTERVAL, ((String) value).replace(",", " ")));
}
break;
case OracleTypes.INTERVALYM:
if (value instanceof String) {
value = convertValueViaQuery(connection, String.format(TO_YMINTERVAL, value));
}
break;
}
return value;
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.util;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author Chris Cranford
*/
public final class TimestampUtils {
private static final ZoneId GMT_ZONE_ID = ZoneId.of("GMT");
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.optionalStart()
.appendPattern(".")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
.optionalEnd()
.toFormatter();
private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("dd-MMM-yy hh.mm.ss")
.optionalStart()
.appendPattern(".")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
.optionalEnd()
.appendPattern(" a")
.toFormatter(Locale.ENGLISH);
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
/**
* Convert the supplied timestamp without a timezone.
*
* @param value the string-value to be converted
* @return the returned converted value or {@code null} if the value could not be converted
*/
public static Instant convertTimestampNoZoneToInstant(String value) {
final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(value);
if (toTimestampMatcher.matches()) {
final LocalDateTime dateTime;
String text = toTimestampMatcher.group(1);
if (text.indexOf(" AM") > 0 || text.indexOf(" PM") > 0) {
dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(text.trim()));
}
else {
dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(text.trim()));
}
return dateTime.atZone(GMT_ZONE_ID).toInstant();
}
final Matcher toDateMatcher = TO_DATE.matcher(value);
if (toDateMatcher.matches()) {
return LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1))).atZone(GMT_ZONE_ID).toInstant();
}
// Unable to resolve value
return null;
}
/**
* Converts the supplied string-value into a SQL compliant {@code TO_TIMESTAMP} string.
*
* @param value the string-value to be converted
* @return the {@code TO_TIMESTAMP} function call
*/
public static String toSqlCompliantFunctionCall(String value) {
final Matcher timestampMatcher = TO_TIMESTAMP.matcher(value);
if (timestampMatcher.matches()) {
String text = timestampMatcher.group(1);
if (text.indexOf(" AM") > 0 || text.indexOf(" PM") > 0) {
return "TO_TIMESTAMP('" + text + "', 'YYYY-MM-DD HH24:MI:SS.FF A')";
}
return "TO_TIMESTAMP('" + text + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
}
final Matcher dateMatcher = TO_DATE.matcher(value);
if (dateMatcher.matches()) {
// TO_DATE is already properly formatted.
return value;
}
return null;
}
private TimestampUtils() {
}
}

View File

@ -0,0 +1,200 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.assertj.core.api.Assertions.assertThat;
import java.sql.Clob;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
/**
* Integration tests when LOB is enabled and the primary key changes, to re-select LOB columns
* within the {@link BaseChangeRecordEmitter}.
*
* @author Chris Cranford
*/
public class OraclePrimaryKeyLobReselectIT extends AbstractConnectorTest {
@Rule
public TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
private OracleConnection connection;
@Before
public void beforeEach() {
connection = TestHelper.testConnection();
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
}
@After
public void afterEach() throws Exception {
super.stopConnector();
if (connection != null) {
connection.close();
}
}
@Test
@FixFor("DBZ-7458")
public void testCharColumn() throws Exception {
testPrimaryKeyChangeReselect("char(5)", "'no'");
}
@Test
@FixFor("DBZ-7458")
public void testNationalizedCharColumn() throws Exception {
testPrimaryKeyChangeReselect("nchar(5)", "'no'");
}
@Test
@FixFor("DBZ-7458")
public void testVarchar2Column() throws Exception {
testPrimaryKeyChangeReselect("varchar2(50)", "'insert'");
}
@Test
@FixFor("DBZ-7458")
public void testNationalizedVarchar2Column() throws Exception {
testPrimaryKeyChangeReselect("nvarchar2(50)", "'insert'");
}
@Test
@FixFor("DBZ-7458")
public void testNumericColumnNotVariableScaleDecimal() throws Exception {
testPrimaryKeyChangeReselect("numeric(18,0)", "25");
}
@Test
@FixFor("DBZ-7458")
public void testNumeric38Column() throws Exception {
testPrimaryKeyChangeReselect("numeric(38,0)", "25");
}
@Test
@FixFor("DBZ-7458")
public void testFloatColumn() throws Exception {
testPrimaryKeyChangeReselect("float(38)", "25");
}
@Test
@FixFor("DBZ-7458")
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "BINARY_FLOAT not supported")
public void testBinaryFloatColumn() throws Exception {
testPrimaryKeyChangeReselect("binary_float", "25");
}
@Test
@FixFor("DBZ-7458")
@SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "BINARY_DOUBLE not supported")
public void testBinaryDoubleColumn() throws Exception {
testPrimaryKeyChangeReselect("binary_double", "25");
}
@Test
@FixFor("DBZ-7458")
public void testDateColumn() throws Exception {
testPrimaryKeyChangeReselect("date", "sysdate");
}
@Test
@FixFor("DBZ-7458")
public void testTimestampColumn() throws Exception {
testPrimaryKeyChangeReselect("timestamp", "current_timestamp");
}
@Test
@FixFor("DBZ-7458")
public void testIntervalYearToMonthColumn() throws Exception {
testPrimaryKeyChangeReselect("interval year to month", "INTERVAL '-3-6' YEAR TO MONTH");
}
@Test
@FixFor("DBZ-7458")
public void testIntervalDayToSecondColumn() throws Exception {
testPrimaryKeyChangeReselect("interval day(3) to second(2)", "INTERVAL '-1 2:3:4.56' DAY TO SECOND");
}
/**
* Types the primary-key change reselection process with a specific key column type and inserted value.
*
* Internally the method uses a composite key and the numeric {@code id} field is what is mutated, which
* triggers the LOB-based emitter reselection.
*
* @param keyType the key column type
* @param keyValue the key value to insert
*/
private void testPrimaryKeyChangeReselect(String keyType, String keyValue) throws Exception {
TestHelper.dropTable(connection, "dbz7458");
try {
connection.execute(String.format(
"CREATE TABLE dbz7458 (id numeric(9,0), other_id %s, data clob, primary key(id, other_id))",
keyType));
TestHelper.streamTable(connection, "dbz7458");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOB_ENABLED, "true")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ7458")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert test row
final String data = RandomStringUtils.randomAlphanumeric(16384);
final Clob clob = connection.connection().createClob();
clob.setString(1, data);
connection.prepareQuery(String.format("INSERT INTO dbz7458 (id,other_id,data) values (1,%s,?)", keyValue),
ps -> ps.setClob(1, clob),
null);
connection.commit();
// Update row without changing LOB
connection.execute("UPDATE dbz7458 SET id = 2 WHERE id = 1");
// There will be the original insert, the delete, tombstone, and the insert
// The last three are based on the primary-key update logic in BaseChangeRecordEmitter
final SourceRecords sourceRecords = consumeRecordsByTopic(4);
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ7458");
assertThat(tableRecords).hasSize(4);
// Fetch the record that will contain the reselected values for the insert
final SourceRecord insert = tableRecords.get(3);
VerifyRecord.isValidInsert(insert, "ID", 2);
// Verify that the LOB column was re-selected
final Struct after = ((Struct) insert.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("DATA")).isEqualTo(data);
}
finally {
TestHelper.dropTable(connection, "dbz7458");
}
}
}