diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 5cc2508de..523c14755 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -66,6 +66,7 @@ public class SqlServerConnection extends JdbcConnection { private static final String STATEMENTS_PLACEHOLDER = "#"; private static final String DATABASE_NAME_PLACEHOLDER = "#db"; + private static final String TABLE_NAME_PLACEHOLDER = "#table"; private static final String GET_MAX_LSN = "SELECT [#db].sys.fn_cdc_get_max_lsn()"; private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00"; private static final String GET_NTH_TRANSACTION_LSN_FROM_BEGINNING = "SELECT MAX(start_lsn) FROM (SELECT TOP (?) start_lsn FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00 ORDER BY start_lsn) as next_lsns"; @@ -77,8 +78,8 @@ public class SqlServerConnection extends JdbcConnection { protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))"; private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, " + LSN_TIMESTAMP_SELECT_STATEMENT; - private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')"; - private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]"; + private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#table](?, ?, N'all update old')"; + private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#table]"; private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC"; /** @@ -366,7 +367,7 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan } final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(STATEMENTS_PLACEHOLDER, source); + .replace(TABLE_NAME_PLACEHOLDER, source); queries[idx] = query; // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java index b39876d23..44ed0afcb 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java @@ -128,8 +128,8 @@ public void shouldParseSpecialChars() throws Exception { .build(); connection.execute( - "CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30))", - "INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a')"); + "CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30), [my col#b] varchar(30))", + "INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a', 'b')"); TestHelper.enableTableCdc(connection, "UAT WAG CZ$Fixed Asset"); start(SqlServerConnector.class, config); assertConnectorIsRunning(); @@ -145,6 +145,7 @@ public void shouldParseSpecialChars() throws Exception { .name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value") .field("id", Schema.INT32_SCHEMA) .field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA) + .field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA) .build()); assertSchemaMatchesStruct( (Struct) record.key(), @@ -154,7 +155,7 @@ public void shouldParseSpecialChars() throws Exception { .build()); assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1); - connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b')"); + connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b', 'c')"); records = consumeRecordsByTopic(1); assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); @@ -165,6 +166,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g .name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value") .field("id", Schema.INT32_SCHEMA) .field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA) + .field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA) .build()); assertSchemaMatchesStruct( (Struct) record.key(), @@ -228,7 +230,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset_Two start(SqlServerConnector.class, config); assertConnectorIsRunning(); - connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b')"); + connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b', 'c')"); records = consumeRecordsByTopic(1); assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); @@ -239,6 +241,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g .name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value") .field("id", Schema.INT32_SCHEMA) .field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA) + .field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA) .build()); assertSchemaMatchesStruct( (Struct) record.key(),