diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBlobDataTypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBlobDataTypesIT.java index c6a6d67b5..de3759c91 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBlobDataTypesIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBlobDataTypesIT.java @@ -1746,6 +1746,75 @@ record = recordsForTopic.get(2); } } + @Test + @FixFor("DBZ-5295") + public void shouldReselectBlobAfterPrimaryKeyChangeWithRowDeletion() throws Exception { + TestHelper.dropTable(connection, "dbz5295"); + try { + connection.execute("create table dbz5295 (id numeric(9,0) primary key, data blob)"); + TestHelper.streamTable(connection, "dbz5295"); + + Blob blob = createBlob(part(BIN_DATA, 0, 1024)); + connection.prepareQuery("INSERT INTO dbz5295 (id,data) values (1,?)", ps -> ps.setBlob(1, blob), null); + connection.commit(); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5295") + .with(OracleConnectorConfig.LOB_ENABLED, true) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + SourceRecords records = consumeRecordsByTopic(1); + List recordsForTopic = records.recordsForTopic(topicName("DBZ5295")); + assertThat(recordsForTopic).hasSize(1); + + SourceRecord record = recordsForTopic.get(0); + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("DATA")).isEqualTo(getByteBufferFromBlob(blob)); + + // Update the PK and then delete the row within the same transaction + connection.executeWithoutCommitting("UPDATE dbz5295 set id = 2 where id = 1"); + connection.execute("DELETE FROM dbz5295 where id = 2"); + + // The update of the primary key causes a DELETE and a CREATE, mingled with a TOMBSTONE + records = consumeRecordsByTopic(4); + recordsForTopic = records.recordsForTopic(topicName("DBZ5295")); + assertThat(recordsForTopic).hasSize(4); + + // First event: DELETE + record = recordsForTopic.get(0); + VerifyRecord.isValidDelete(record, "ID", 1); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after).isNull(); + + // Second event: TOMBSTONE + record = recordsForTopic.get(1); + VerifyRecord.isValidTombstone(record); + + // Third event: CREATE + record = recordsForTopic.get(2); + VerifyRecord.isValidInsert(record, "ID", 2); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(2); + assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config)); + + // Fourth event: DELETE + record = recordsForTopic.get(3); + VerifyRecord.isValidDelete(record, "ID", 2); + Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE); + assertThat(before.get("ID")).isEqualTo(2); + assertThat(before.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config)); + } + finally { + TestHelper.dropTable(connection, "dbz5295"); + } + } + private static byte[] part(byte[] buffer, int start, int length) { return Arrays.copyOfRange(buffer, start, length); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java index 594096f68..3bcf06ccc 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java @@ -2166,6 +2166,76 @@ record = recordsForTopic.get(2); } } + @Test + @FixFor("DBZ-5295") + public void shouldReselectClobAfterPrimaryKeyChangeWithRowDeletion() throws Exception { + TestHelper.dropTable(connection, "dbz5295"); + try { + connection.execute("create table dbz5295 (id numeric(9,0) primary key, data clob, data2 clob)"); + TestHelper.streamTable(connection, "dbz5295"); + + connection.execute("INSERT INTO dbz5295 (id,data,data2) values (1,'Small clob data','Data2')"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5295") + .with(OracleConnectorConfig.LOB_ENABLED, true) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + SourceRecords records = consumeRecordsByTopic(1); + List recordsForTopic = records.recordsForTopic(topicName("DBZ5295")); + assertThat(recordsForTopic).hasSize(1); + + SourceRecord record = recordsForTopic.get(0); + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("DATA")).isEqualTo("Small clob data"); + assertThat(after.get("DATA2")).isEqualTo("Data2"); + + // Update the PK and then delete the row within the same transaction + connection.executeWithoutCommitting("UPDATE dbz5295 set id = 2 where id = 1"); + connection.execute("DELETE FROM dbz5295 where id = 2"); + + // The update of the primary key causes a DELETE and a CREATE, mingled with a TOMBSTONE + records = consumeRecordsByTopic(4); + recordsForTopic = records.recordsForTopic(topicName("DBZ5295")); + assertThat(recordsForTopic).hasSize(4); + + // First event: DELETE + record = recordsForTopic.get(0); + VerifyRecord.isValidDelete(record, "ID", 1); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after).isNull(); + + // Second event: TOMBSTONE + record = recordsForTopic.get(1); + VerifyRecord.isValidTombstone(record); + + // Third event: CREATE + record = recordsForTopic.get(2); + VerifyRecord.isValidInsert(record, "ID", 2); + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + assertThat(after.get("ID")).isEqualTo(2); + assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config)); + assertThat(after.get("DATA2")).isEqualTo(getUnavailableValuePlaceholder(config)); + + // Fourth event: DELETE + record = recordsForTopic.get(3); + VerifyRecord.isValidDelete(record, "ID", 2); + Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE); + assertThat(before.get("ID")).isEqualTo(2); + assertThat(before.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config)); + assertThat(before.get("DATA2")).isEqualTo(getUnavailableValuePlaceholder(config)); + } + finally { + TestHelper.dropTable(connection, "dbz5295"); + } + } + private Clob createClob(String data) throws SQLException { Clob clob = connection.connection().createClob(); clob.setString(1, data);