DBZ-5006 Add suggested test coverage

This commit is contained in:
Chris Cranford 2022-04-26 09:47:11 -04:00 committed by Chris Cranford
parent 7e845f83cb
commit 43ae067ef8

View File

@ -3566,14 +3566,28 @@ public void shouldNotRestartLogMiningSessionWithMaxSessionZero() throws Exceptio
@Test
@FixFor("DBZ-5006")
public void shouldSupportTablesWithForwardSlashes() throws Exception {
TestHelper.dropTable(connection, "\"dbz/5006\"");
// Different forward-slash scenarios
testTableWithForwardSlashes("/dbz5006", "_dbz5006");
testTableWithForwardSlashes("dbz/5006", "dbz_5006");
testTableWithForwardSlashes("dbz5006/", "dbz5006_");
testTableWithForwardSlashes("db/z50/06", "db_z50_06");
testTableWithForwardSlashes("dbz//5006", "dbz__5006");
}
private void testTableWithForwardSlashes(String tableName, String topicTableName) throws Exception {
final String quotedTableName = "\"" + tableName + "\"";
TestHelper.dropTable(connection, quotedTableName);
try {
connection.execute("CREATE TABLE \"dbz/5006\" (id numeric(9,0) primary key, data varchar2(50))");
connection.execute("INSERT INTO \"dbz/5006\" (id,data) values (1, 'Record1')");
TestHelper.streamTable(connection, "\"dbz/5006\"");
// Always want to make sure the offsets are cleared for each invocation of this sub-test
Testing.Files.delete(OFFSET_STORE_PATH);
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
connection.execute("CREATE TABLE " + quotedTableName + " (id numeric(9,0) primary key, data varchar2(50))");
connection.execute("INSERT INTO " + quotedTableName + " (id,data) values (1, 'Record1')");
TestHelper.streamTable(connection, quotedTableName);
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.dbz/5006")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\." + tableName)
.build();
start(OracleConnector.class, config);
@ -3584,36 +3598,40 @@ public void shouldSupportTablesWithForwardSlashes() throws Exception {
// NOTE: Forward slashes are not valid in topic names, will be converted to underscore.
// The following takes this into account.
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.dbz_5006");
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidRead(tableRecords.get(0), "ID", 1);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
assertNoRecordsToConsume();
connection.execute("INSERT INTO \"dbz/5006\" (id,data) values (2,'Record2')");
connection.execute("INSERT INTO " + quotedTableName + " (id,data) values (2,'Record2')");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic("server1.DEBEZIUM.dbz_5006");
tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidInsert(tableRecords.get(0), "ID", 2);
connection.execute("UPDATE \"dbz/5006\" SET data = 'Record2u' WHERE id = 2");
connection.execute("UPDATE " + quotedTableName + " SET data = 'Record2u' WHERE id = 2");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic("server1.DEBEZIUM.dbz_5006");
tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidUpdate(tableRecords.get(0), "ID", 2);
connection.execute("DELETE \"dbz/5006\" WHERE id = 1");
connection.execute("DELETE " + quotedTableName + " WHERE id = 1");
records = consumeRecordsByTopic(2);
tableRecords = records.recordsForTopic("server1.DEBEZIUM.dbz_5006");
tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
assertThat(tableRecords).hasSize(2);
VerifyRecord.isValidDelete(tableRecords.get(0), "ID", 1);
VerifyRecord.isValidTombstone(tableRecords.get(1));
assertNoRecordsToConsume();
}
catch (Exception e) {
throw new RuntimeException("Forward-slash test failed for table: " + tableName, e);
}
finally {
TestHelper.dropTable(connection, "\"dbz/5006\"");
stopConnector();
TestHelper.dropTable(connection, quotedTableName);
}
}