DBZ-7718 Support table name escaping in BlockingSnapshotIT

This commit is contained in:
mfvitale 2024-04-02 12:52:31 +02:00 committed by Jiri Pechanec
parent ba2f893168
commit 1190389edc
5 changed files with 23 additions and 1 deletions

View File

@ -137,6 +137,11 @@ protected String signalTableName() {
return tableNameId("debezium_signal").toQuotedString('`');
}
@Override
protected String escapedTableDataCollectionId() {
return String.format("\\\\\"%s\\\\\".\\\\\"%s\\\\\"", tableNameId().catalog(), tableNameId().table());
}
@Override
protected String signalTableNameSanitized() {
return DATABASE.qualifiedTableName("debezium_signal");

View File

@ -110,6 +110,11 @@ protected String tableDataCollectionId() {
return TestHelper.getDatabaseName() + ".DEBEZIUM.A";
}
@Override
protected String escapedTableDataCollectionId() {
return "\\\"" + TestHelper.getDatabaseName() + "\\\".\\\"DEBEZIUM\\\".\\\"A\\\"";
}
@Override
protected List<String> tableDataCollectionIds() {
return List.of(TestHelper.getDatabaseName() + ".DEBEZIUM.A", TestHelper.getDatabaseName() + ".DEBEZIUM.B");

View File

@ -112,6 +112,11 @@ protected String signalTableName() {
return "s1.debezium_signal";
}
@Override
protected String escapedTableDataCollectionId() {
return "\\\"s1\\\".\\\"a\\\"";
}
@Override
protected String connector() {
return "postgres";

View File

@ -88,6 +88,11 @@ protected String signalTableName() {
return "dbo.debezium_signal";
}
@Override
protected String escapedTableDataCollectionId() {
return "\\\"testDB1\\\".\\\"dbo\\\".\\\"a\\\"";
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()

View File

@ -57,6 +57,8 @@ public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest
@Override
protected abstract String tableName();
protected abstract String escapedTableDataCollectionId();
@Override
protected abstract String connector();
@ -202,7 +204,7 @@ public void executeBlockingSnapshotWithEscapedCollectionName() throws Exception
SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(ROW_COUNT * 2, 10);
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumedRecordsByTopic);
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, "\\\"s1\\\".\\\"a\\\"");
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, escapedTableDataCollectionId());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);