DBZ-7893: Account for possible # in a column name

In the case when there is a `#` in a column name it gets replaced with
the capture instance name according to how the CDC data query is built.

This patch makes the replacement more specific by changing the
replacement placeholder from `#` to `#table` in this particular case.
This commit is contained in:
Vadzim Ramanenka 2024-05-16 12:30:44 +02:00 committed by Jiri Pechanec
parent 5087b30538
commit 1045ecf604
2 changed files with 11 additions and 7 deletions

View File

@ -66,6 +66,7 @@ public class SqlServerConnection extends JdbcConnection {
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String DATABASE_NAME_PLACEHOLDER = "#db";
private static final String TABLE_NAME_PLACEHOLDER = "#table";
private static final String GET_MAX_LSN = "SELECT [#db].sys.fn_cdc_get_max_lsn()";
private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00";
private static final String GET_NTH_TRANSACTION_LSN_FROM_BEGINNING = "SELECT MAX(start_lsn) FROM (SELECT TOP (?) start_lsn FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00 ORDER BY start_lsn) as next_lsns";
@ -77,8 +78,8 @@ public class SqlServerConnection extends JdbcConnection {
protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))";
private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, "
+ LSN_TIMESTAMP_SELECT_STATEMENT;
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#table](?, ?, N'all update old')";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#table]";
private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
/**
@ -366,7 +367,7 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(STATEMENTS_PLACEHOLDER, source);
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available

View File

@ -128,8 +128,8 @@ public void shouldParseSpecialChars() throws Exception {
.build();
connection.execute(
"CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30))",
"INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a')");
"CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30), [my col#b] varchar(30))",
"INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a', 'b')");
TestHelper.enableTableCdc(connection, "UAT WAG CZ$Fixed Asset");
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
@ -145,6 +145,7 @@ public void shouldParseSpecialChars() throws Exception {
.name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
.field("id", Schema.INT32_SCHEMA)
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
.field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertSchemaMatchesStruct(
(Struct) record.key(),
@ -154,7 +155,7 @@ public void shouldParseSpecialChars() throws Exception {
.build());
assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1);
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b')");
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b', 'c')");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
@ -165,6 +166,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g
.name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
.field("id", Schema.INT32_SCHEMA)
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
.field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertSchemaMatchesStruct(
(Struct) record.key(),
@ -228,7 +230,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset_Two
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b')");
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b', 'c')");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
@ -239,6 +241,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g
.name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
.field("id", Schema.INT32_SCHEMA)
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
.field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertSchemaMatchesStruct(
(Struct) record.key(),