DBZ-3631 Fix Oracle trx reconciliation not finding PK columns

This commit is contained in:
Chris Cranford 2021-06-16 03:29:15 -04:00 committed by Gunnar Morling
parent b4ac2f9c1c
commit 29b7f1fa44
2 changed files with 81 additions and 7 deletions

View File

@ -42,6 +42,7 @@
*/
public class LogMinerDmlParser implements DmlParser {
private static final String NULL_SENTINEL = "${DBZ_NULL}";
private static final String NULL = "NULL";
private static final String INSERT_INTO = "insert into ";
private static final String UPDATE = "update ";
@ -134,13 +135,17 @@ private LogMinerDmlEntry parseUpdate(String sql, Table table) {
Object[] oldValues = new Object[table.columns().size()];
parseWhereClause(sql, index, oldValues, table);
// set after
if (!isEmptyArray(newValues)) {
// There is after state, make sure all non-provided before state is copied to after.
for (int i = 0; i < oldValues.length; ++i) {
if (newValues[i] == null && oldValues[i] != null) {
newValues[i] = oldValues[i];
}
// For each after state field that is either a NULL_SENTINEL (explicitly wants NULL) or
// that wasn't specified and therefore remained null, correctly adapt the after state
// accordingly, leaving any field's after value alone if it isn't null or a sentinel.
for (int i = 0; i < oldValues.length; ++i) {
if (newValues[i] == NULL_SENTINEL) {
// field is explicitly set to NULL, clear the sentinel and continue
newValues[i] = null;
}
else if (newValues[i] == null) {
// field wasn't specified in set-clause, copy before state to after state
newValues[i] = oldValues[i];
}
}
@ -398,6 +403,15 @@ else if (c == ')' && nested > 0) {
else if ((c == ',' || c == ' ' || c == ';') && nested == 0) {
String value = sql.substring(start, index);
if (value.equals(NULL) || value.equals(UNSUPPORTED_TYPE)) {
if (value.equals(NULL)) {
// In order to identify when a field is not present in the set-clause or when
// a field is explicitly set to null, the NULL_SENTINEL value is used to then
// indicate that the field is explicitly being cleared to NULL.
// This sentinel value will be cleared later when we reconcile before/after
// state in parseUpdate()
int position = LogMinerHelper.getColumnIndexByName(currentColumnName, table);
newValues[position] = NULL_SENTINEL;
}
start = index + 1;
inColumnValue = false;
inSpecial = false;

View File

@ -9,12 +9,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
@ -1003,6 +1005,64 @@ record = records.recordsForTopic(topicName("BLOB_TEST")).get(2);
assertThat(after.get("VAL_DATA")).isEqualTo("Test1U");
}
@Test
@FixFor("DBZ-3631")
public void shouldReconcileTransactionWhenAllBlobClobAreInitializedAsNull() throws Exception {
final String DDL = "CREATE TABLE dbz3631 ("
+ "ID NUMBER(38) NOT NULL,"
+ "ENTITY_ID NUMBER(38) NOT NULL,"
+ "DOCX BLOB,"
+ "DOCX_SIGNATURE BLOB,"
+ "XML_OOS BLOB,"
+ "XML_OOS_SIGNATURE BLOB,"
+ "PRIMARY KEY(ID))";
TestHelper.dropTable(connection, "dbz3631");
try {
connection.execute(DDL);
TestHelper.streamTable(connection, "dbz3631");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ3631")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Performs an insert with several blob fields, should produce an insert/update pair
connection.executeWithoutCommitting("INSERT INTO dbz3631 ("
+ "ID,"
+ "ENTITY_ID"
+ ") VALUES ("
+ "13268281,"
+ "13340568"
+ ")");
connection.commit();
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> table = records.recordsForTopic("server1.DEBEZIUM.DBZ3631");
assertThat(table).hasSize(1);
SourceRecord record = table.get(0);
Struct value = (Struct) record.value();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(13268281));
assertThat(after.get("ENTITY_ID")).isEqualTo(BigDecimal.valueOf(13340568));
assertThat(after.get("DOCX")).isNull();
assertThat(after.get("DOCX_SIGNATURE")).isNull();
assertThat(after.get("XML_OOS")).isNull();
assertThat(after.get("XML_OOS_SIGNATURE")).isNull();
assertThat(value.get(Envelope.FieldName.OPERATION)).isEqualTo(Envelope.Operation.CREATE.code());
}
finally {
TestHelper.dropTable(connection, "dbz3631");
}
}
private static byte[] part(byte[] buffer, int start, int length) {
return Arrays.copyOfRange(buffer, start, length);
}