diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index a5146f2e9..efc3644eb 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -157,6 +157,7 @@ Keith Barber Kevin Pullin Kewei Shang Krizhan Mariampillai +Kyley Jex Leo Mei Lev Zemlyanov Linh Nguyen Hoang diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotWithSelectOverridesIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotWithSelectOverridesIT.java index 509f63fb2..fb1e75b02 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotWithSelectOverridesIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotWithSelectOverridesIT.java @@ -135,4 +135,50 @@ public void takeSnapshotWithOverrides() throws Exception { // the ORDER BY clause should be applied, too assertThat(actualIdsForTable1.toString()).isEqualTo(expectedIdsForTable1); } + + @Test + @FixFor("DBZ-3429") + public void takeSnapshotWithOverridesWithAdditionalWhitespace() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with( + RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, + " dbo.table1 , dbo.table3 ") + .with( + RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table1", + "SELECT * FROM [dbo].[table1] where soft_deleted = 0 order by id desc") + .with( + RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3", + "SELECT * FROM [dbo].[table3] where soft_deleted = 0") + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2); + List table1 = records.recordsForTopic("server1.dbo.table1"); + List table2 = records.recordsForTopic("server1.dbo.table2"); + List table3 = records.recordsForTopic("server1.dbo.table3"); + + // soft_deleted records should be excluded for table1 and table3 + assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2); + assertThat(table2).hasSize(INITIAL_RECORDS_PER_TABLE); + assertThat(table3).hasSize(INITIAL_RECORDS_PER_TABLE / 2); + + String expectedIdsForTable1 = "86420"; + StringBuilder actualIdsForTable1 = new StringBuilder(); + + for (int i = 0; i < INITIAL_RECORDS_PER_TABLE / 2; i++) { + SourceRecord record = table1.get(i); + + Struct key = (Struct) record.key(); + actualIdsForTable1.append(key.get("id")); + + // soft_deleted records should be excluded + Struct value = (Struct) record.value(); + assertThat(((Struct) value.get("after")).get("soft_deleted")).isEqualTo(false); + } + + // the ORDER BY clause should be applied, too + assertThat(actualIdsForTable1.toString()).isEqualTo(expectedIdsForTable1); + } } diff --git a/debezium-core/src/main/java/io/debezium/config/Configuration.java b/debezium-core/src/main/java/io/debezium/config/Configuration.java index 3d911fdbf..3493951d9 100644 --- a/debezium-core/src/main/java/io/debezium/config/Configuration.java +++ b/debezium-core/src/main/java/io/debezium/config/Configuration.java @@ -13,6 +13,7 @@ import java.net.URL; import java.time.Duration; import java.time.temporal.TemporalUnit; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -1079,6 +1080,26 @@ default List getStrings(String key, String regex) { return Collect.arrayListOf(value.split(regex)); } + /** + * Get the string value(s) associated with the given key, where the supplied regular expression is used to parse the single + * string value into multiple values. In addition, all values will be trimmed. + * + * @param field the field for the configuration property + * @param regex the delimiting regular expression + * @return the list of string values; null only if there is no such key-value pair in the configuration + * @see String#split(String) + */ + default List getTrimmedStrings(Field field, String regex) { + String value = getString(field); + if (value == null) { + return null; + } + + return Arrays.stream(value.split(regex)) + .map(String::trim) + .collect(Collectors.toList()); + } + /** * Get the integer value associated with the given key. * diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java index c90c5b482..6e20007e3 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java @@ -340,7 +340,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) { .withImportance(Importance.MEDIUM) .withDescription(" This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the" + - "specific connectors . Select statements for the individual tables are " + + "specific connectors. Select statements for the individual tables are " + "specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or " + "'snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]', respectively. " + @@ -700,7 +700,7 @@ private static int validateTableExcludeList(Configuration config, Field field, V * Returns any SELECT overrides, if present. */ public Map getSnapshotSelectOverridesByTable() { - List tableValues = getConfig().getStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "\\s*,\\s*"); + List tableValues = getConfig().getTrimmedStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ","); if (tableValues == null) { return Collections.emptyMap();