From 1045ecf6049d65c13b0c17cd8f48fa991e000b16 Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Thu, 16 May 2024 12:30:44 +0200 Subject: [PATCH] DBZ-7893: Account for possible `#` in a column name In the case when there is a `#` in a column name it gets replaced with the capture instance name according to how the CDC data query is built. This patch makes the replacement more specific by changing the replacement placeholder from `#` to `#table` in this particular case. --- .../connector/sqlserver/SqlServerConnection.java | 7 ++++--- .../connector/sqlserver/SpecialCharsInNamesIT.java | 11 +++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 5cc2508de..523c14755 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -66,6 +66,7 @@ public class SqlServerConnection extends JdbcConnection { private static final String STATEMENTS_PLACEHOLDER = "#"; private static final String DATABASE_NAME_PLACEHOLDER = "#db"; + private static final String TABLE_NAME_PLACEHOLDER = "#table"; private static final String GET_MAX_LSN = "SELECT [#db].sys.fn_cdc_get_max_lsn()"; private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00"; private static final String GET_NTH_TRANSACTION_LSN_FROM_BEGINNING = "SELECT MAX(start_lsn) FROM (SELECT TOP (?) start_lsn FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00 ORDER BY start_lsn) as next_lsns"; @@ -77,8 +78,8 @@ public class SqlServerConnection extends JdbcConnection { protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))"; private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, " + LSN_TIMESTAMP_SELECT_STATEMENT; - private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')"; - private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]"; + private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#table](?, ?, N'all update old')"; + private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#table]"; private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC"; /** @@ -366,7 +367,7 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan } final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(STATEMENTS_PLACEHOLDER, source); + .replace(TABLE_NAME_PLACEHOLDER, source); queries[idx] = query; // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java index b39876d23..44ed0afcb 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java @@ -128,8 +128,8 @@ public void shouldParseSpecialChars() throws Exception { .build(); connection.execute( - "CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30))", - "INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a')"); + "CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30), [my col#b] varchar(30))", + "INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a', 'b')"); TestHelper.enableTableCdc(connection, "UAT WAG CZ$Fixed Asset"); start(SqlServerConnector.class, config); assertConnectorIsRunning(); @@ -145,6 +145,7 @@ public void shouldParseSpecialChars() throws Exception { .name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value") .field("id", Schema.INT32_SCHEMA) .field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA) + .field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA) .build()); assertSchemaMatchesStruct( (Struct) record.key(), @@ -154,7 +155,7 @@ public void shouldParseSpecialChars() throws Exception { .build()); assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1); - connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b')"); + connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b', 'c')"); records = consumeRecordsByTopic(1); assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); @@ -165,6 +166,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g .name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value") .field("id", Schema.INT32_SCHEMA) .field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA) + .field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA) .build()); assertSchemaMatchesStruct( (Struct) record.key(), @@ -228,7 +230,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset_Two start(SqlServerConnector.class, config); assertConnectorIsRunning(); - connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b')"); + connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b', 'c')"); records = consumeRecordsByTopic(1); assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); @@ -239,6 +241,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g .name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value") .field("id", Schema.INT32_SCHEMA) .field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA) + .field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA) .build()); assertSchemaMatchesStruct( (Struct) record.key(),