DBZ-5581 Apply unavailable value placeholder on when parsing SELECT_LOB_LOCATOR event types

This commit is contained in:
Chris Cranford 2022-09-08 10:06:12 -04:00 committed by Jiri Pechanec
parent 04809c2eb4
commit 5484948127
5 changed files with 169 additions and 15 deletions

View File

@ -6,10 +6,7 @@
package io.debezium.connector.oracle.logminer.parser;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
/**
@ -141,7 +138,7 @@ private LogMinerDmlEntry parseUpdate(String sql, Table table) {
// accordingly, leaving any field's after value alone if it isn't null or a sentinel.
for (int i = 0; i < oldValues.length; ++i) {
// set unavailable value in the old values if applicable
oldValues[i] = getColumnUnavailableValue(oldValues[i], table.columns().get(i));
oldValues[i] = ParserUtils.getColumnUnavailableValue(oldValues[i], table.columns().get(i));
if (newValues[i] == NULL_SENTINEL) {
// field is explicitly set to NULL, clear the sentinel and continue
newValues[i] = null;
@ -179,10 +176,7 @@ private LogMinerDmlEntry parseDelete(String sql, Table table) {
parseWhereClause(sql, index, oldValues, table);
// Check and update unavailable column values
for (int i = 0; i < oldValues.length; ++i) {
// set unavailable value in the old values if applicable
oldValues[i] = getColumnUnavailableValue(oldValues[i], table.columns().get(i));
}
ParserUtils.setColumnUnavailableValues(oldValues, table);
return LogMinerDmlEntryImpl.forDelete(oldValues);
}
@ -665,11 +659,4 @@ else if (c == 'o' && lookAhead == 'r' && sql.indexOf(OR, index) == index) {
return index;
}
private Object getColumnUnavailableValue(Object value, Column column) {
if (value == null && OracleDatabaseSchema.isLobColumn(column)) {
return OracleValueConverters.UNAVAILABLE_VALUE;
}
return value;
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.parser;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
/**
* Utility helper methods for the Oracle LogMiner DML parsing classes.
*
* @author Chris Cranford
*/
public class ParserUtils {
/**
* Set the unavailable value placeholder in the column value array for null LOB-based columns.
*
* @param columnValues the column values array, should not be {@code null}
* @param table the relational model table, should not be {@code null}
*/
public static void setColumnUnavailableValues(Object[] columnValues, Table table) {
for (int i = 0; i < columnValues.length; ++i) {
// set unavailable value in the object array if applicable
columnValues[i] = getColumnUnavailableValue(columnValues[i], table.columns().get(i));
}
}
/**
* Resolve the column value for a given column value and column instance.
* <p>
* If the column value is {@code null} and the column is an LOB-based column, this method will
* resolve the final column value as {@link OracleValueConverters#UNAVAILABLE_VALUE}, a value
* that represents that the column should be emitted with the unavailable value placeholder.
* <p>
* If the column value is not {@code null} or is not an LOB-based column, the method will
* simply return the column's value as-is without modification.
*
* @param value the column's value, may be {@code null}
* @param column the relational model's column instance, should not be {@code null}
* @return the resolved column's value
*/
public static Object getColumnUnavailableValue(Object value, Column column) {
if (value == null && OracleDatabaseSchema.isLobColumn(column)) {
return OracleValueConverters.UNAVAILABLE_VALUE;
}
return value;
}
}

View File

@ -74,6 +74,8 @@ public LogMinerDmlEntry parse(String sql, Table table) {
}
}
}
ParserUtils.setColumnUnavailableValues(columnValues, table);
}
}
}

View File

@ -1815,6 +1815,62 @@ record = recordsForTopic.get(3);
}
}
@Test
@FixFor("DBZ-5581")
public void testBlobUnavailableValuePlaceholderUpdateOnlyOneBlobColumn() throws Exception {
TestHelper.dropTable(connection, "dbz5581");
try {
connection.execute("create table dbz5581 (id numeric(9,0) primary key, a1 varchar2(200), a2 blob, a3 blob, a4 varchar2(100))");
TestHelper.streamTable(connection, "dbz5581");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5581")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
final Blob a2 = createBlob(part(BIN_DATA, 0, 4100));
final Blob a3 = createBlob(part(BIN_DATA, 0, 4100));
connection.prepareQuery("INSERT into dbz5581 (id,a1,a2,a3,a4) values (1, 'lwmzVQd6r7', ?, ?, 'cuTVQV0OpK')", st -> {
st.setBlob(1, a2);
st.setBlob(2, a3);
}, null);
connection.commit();
final Blob a2u = createBlob(part(BIN_DATA, 1, 4101));
connection.prepareQuery("UPDATE dbz5581 set A2=? WHERE ID=1", st -> st.setBlob(1, a2u), null);
connection.commit();
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DBZ5581");
assertThat(recordsForTopic).hasSize(2);
SourceRecord record = recordsForTopic.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("A1")).isEqualTo("lwmzVQd6r7");
assertThat(after.get("A2")).isEqualTo(getByteBufferFromBlob(a2));
assertThat(after.get("A3")).isEqualTo(getByteBufferFromBlob(a3));
assertThat(after.get("A4")).isEqualTo("cuTVQV0OpK");
record = recordsForTopic.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("A1")).isEqualTo("lwmzVQd6r7");
assertThat(after.get("A2")).isEqualTo(getByteBufferFromBlob(a2u));
assertThat(after.get("A3")).isEqualTo(getUnavailableValuePlaceholder(config));
assertThat(after.get("A4")).isEqualTo("cuTVQV0OpK");
}
finally {
TestHelper.dropTable(connection, "dbz5581");
}
}
private static byte[] part(byte[] buffer, int start, int length) {
return Arrays.copyOfRange(buffer, start, length);
}

View File

@ -2236,6 +2236,62 @@ record = recordsForTopic.get(3);
}
}
@Test
@FixFor("DBZ-5581")
public void testClobUnavailableValuePlaceholderUpdateOnlyOneClobColumn() throws Exception {
TestHelper.dropTable(connection, "dbz5581");
try {
connection.execute("create table dbz5581 (id numeric(9,0) primary key, a1 varchar2(200), a2 clob, a3 nclob, a4 varchar2(100))");
TestHelper.streamTable(connection, "dbz5581");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5581")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
final Clob a2 = createClob(part(JSON_DATA, 0, 4100));
final NClob a3 = createNClob(part(JSON_DATA2, 0, 4100));
connection.prepareQuery("INSERT into dbz5581 (id,a1,a2,a3,a4) values (1, 'lwmzVQd6r7', ?, ?, 'cuTVQV0OpK')", st -> {
st.setClob(1, a2);
st.setNClob(2, a3);
}, null);
connection.commit();
final Clob a2u = createClob(part(JSON_DATA, 1, 4101));
connection.prepareQuery("UPDATE dbz5581 set A2=? WHERE ID=1", st -> st.setClob(1, a2u), null);
connection.commit();
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DBZ5581");
assertThat(recordsForTopic).hasSize(2);
SourceRecord record = recordsForTopic.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("A1")).isEqualTo("lwmzVQd6r7");
assertThat(after.get("A2")).isEqualTo(getClobString(a2));
assertThat(after.get("A3")).isEqualTo(getClobString(a3));
assertThat(after.get("A4")).isEqualTo("cuTVQV0OpK");
record = recordsForTopic.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("A1")).isEqualTo("lwmzVQd6r7");
assertThat(after.get("A2")).isEqualTo(getClobString(a2u));
assertThat(after.get("A3")).isEqualTo(getUnavailableValuePlaceholder(config));
assertThat(after.get("A4")).isEqualTo("cuTVQV0OpK");
}
finally {
TestHelper.dropTable(connection, "dbz5581");
}
}
private Clob createClob(String data) throws SQLException {
Clob clob = connection.connection().createClob();
clob.setString(1, data);