DBZ-5295 Add delete row test case

This commit is contained in:
Chris Cranford 2022-07-11 12:44:19 -04:00 committed by Jiri Pechanec
parent 843f133d4d
commit 35e3eae52f
2 changed files with 139 additions and 0 deletions

View File

@ -1746,6 +1746,75 @@ record = recordsForTopic.get(2);
}
}
@Test
@FixFor("DBZ-5295")
public void shouldReselectBlobAfterPrimaryKeyChangeWithRowDeletion() throws Exception {
TestHelper.dropTable(connection, "dbz5295");
try {
connection.execute("create table dbz5295 (id numeric(9,0) primary key, data blob)");
TestHelper.streamTable(connection, "dbz5295");
Blob blob = createBlob(part(BIN_DATA, 0, 1024));
connection.prepareQuery("INSERT INTO dbz5295 (id,data) values (1,?)", ps -> ps.setBlob(1, blob), null);
connection.commit();
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5295")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("DBZ5295"));
assertThat(recordsForTopic).hasSize(1);
SourceRecord record = recordsForTopic.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo(getByteBufferFromBlob(blob));
// Update the PK and then delete the row within the same transaction
connection.executeWithoutCommitting("UPDATE dbz5295 set id = 2 where id = 1");
connection.execute("DELETE FROM dbz5295 where id = 2");
// The update of the primary key causes a DELETE and a CREATE, mingled with a TOMBSTONE
records = consumeRecordsByTopic(4);
recordsForTopic = records.recordsForTopic(topicName("DBZ5295"));
assertThat(recordsForTopic).hasSize(4);
// First event: DELETE
record = recordsForTopic.get(0);
VerifyRecord.isValidDelete(record, "ID", 1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after).isNull();
// Second event: TOMBSTONE
record = recordsForTopic.get(1);
VerifyRecord.isValidTombstone(record);
// Third event: CREATE
record = recordsForTopic.get(2);
VerifyRecord.isValidInsert(record, "ID", 2);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
// Fourth event: DELETE
record = recordsForTopic.get(3);
VerifyRecord.isValidDelete(record, "ID", 2);
Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE);
assertThat(before.get("ID")).isEqualTo(2);
assertThat(before.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
}
finally {
TestHelper.dropTable(connection, "dbz5295");
}
}
private static byte[] part(byte[] buffer, int start, int length) {
return Arrays.copyOfRange(buffer, start, length);
}

View File

@ -2166,6 +2166,76 @@ record = recordsForTopic.get(2);
}
}
@Test
@FixFor("DBZ-5295")
public void shouldReselectClobAfterPrimaryKeyChangeWithRowDeletion() throws Exception {
TestHelper.dropTable(connection, "dbz5295");
try {
connection.execute("create table dbz5295 (id numeric(9,0) primary key, data clob, data2 clob)");
TestHelper.streamTable(connection, "dbz5295");
connection.execute("INSERT INTO dbz5295 (id,data,data2) values (1,'Small clob data','Data2')");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5295")
.with(OracleConnectorConfig.LOB_ENABLED, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("DBZ5295"));
assertThat(recordsForTopic).hasSize(1);
SourceRecord record = recordsForTopic.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Small clob data");
assertThat(after.get("DATA2")).isEqualTo("Data2");
// Update the PK and then delete the row within the same transaction
connection.executeWithoutCommitting("UPDATE dbz5295 set id = 2 where id = 1");
connection.execute("DELETE FROM dbz5295 where id = 2");
// The update of the primary key causes a DELETE and a CREATE, mingled with a TOMBSTONE
records = consumeRecordsByTopic(4);
recordsForTopic = records.recordsForTopic(topicName("DBZ5295"));
assertThat(recordsForTopic).hasSize(4);
// First event: DELETE
record = recordsForTopic.get(0);
VerifyRecord.isValidDelete(record, "ID", 1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after).isNull();
// Second event: TOMBSTONE
record = recordsForTopic.get(1);
VerifyRecord.isValidTombstone(record);
// Third event: CREATE
record = recordsForTopic.get(2);
VerifyRecord.isValidInsert(record, "ID", 2);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
assertThat(after.get("DATA2")).isEqualTo(getUnavailableValuePlaceholder(config));
// Fourth event: DELETE
record = recordsForTopic.get(3);
VerifyRecord.isValidDelete(record, "ID", 2);
Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE);
assertThat(before.get("ID")).isEqualTo(2);
assertThat(before.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(config));
assertThat(before.get("DATA2")).isEqualTo(getUnavailableValuePlaceholder(config));
}
finally {
TestHelper.dropTable(connection, "dbz5295");
}
}
private Clob createClob(String data) throws SQLException {
Clob clob = connection.connection().createClob();
clob.setString(1, data);