DBZ-3429 Making option retrieval lenient towards trailing/leading whitespace

This commit is contained in:
Gunnar Morling 2021-04-28 11:01:27 +02:00
parent 99536f21d2
commit 8f891248ad
4 changed files with 70 additions and 2 deletions

View File

@ -157,6 +157,7 @@ Keith Barber
Kevin Pullin
Kewei Shang
Krizhan Mariampillai
Kyley Jex
Leo Mei
Lev Zemlyanov
Linh Nguyen Hoang

View File

@ -135,4 +135,50 @@ public void takeSnapshotWithOverrides() throws Exception {
// the ORDER BY clause should be applied, too
assertThat(actualIdsForTable1.toString()).isEqualTo(expectedIdsForTable1);
}
@Test
@FixFor("DBZ-3429")
public void takeSnapshotWithOverridesWithAdditionalWhitespace() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
" dbo.table1 , dbo.table3 ")
.with(
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table1",
"SELECT * FROM [dbo].[table1] where soft_deleted = 0 order by id desc")
.with(
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3",
"SELECT * FROM [dbo].[table3] where soft_deleted = 0")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2);
List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
List<SourceRecord> table2 = records.recordsForTopic("server1.dbo.table2");
List<SourceRecord> table3 = records.recordsForTopic("server1.dbo.table3");
// soft_deleted records should be excluded for table1 and table3
assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2);
assertThat(table2).hasSize(INITIAL_RECORDS_PER_TABLE);
assertThat(table3).hasSize(INITIAL_RECORDS_PER_TABLE / 2);
String expectedIdsForTable1 = "86420";
StringBuilder actualIdsForTable1 = new StringBuilder();
for (int i = 0; i < INITIAL_RECORDS_PER_TABLE / 2; i++) {
SourceRecord record = table1.get(i);
Struct key = (Struct) record.key();
actualIdsForTable1.append(key.get("id"));
// soft_deleted records should be excluded
Struct value = (Struct) record.value();
assertThat(((Struct) value.get("after")).get("soft_deleted")).isEqualTo(false);
}
// the ORDER BY clause should be applied, too
assertThat(actualIdsForTable1.toString()).isEqualTo(expectedIdsForTable1);
}
}

View File

@ -13,6 +13,7 @@
import java.net.URL;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -1079,6 +1080,26 @@ default List<String> getStrings(String key, String regex) {
return Collect.arrayListOf(value.split(regex));
}
/**
* Get the string value(s) associated with the given key, where the supplied regular expression is used to parse the single
* string value into multiple values. In addition, all values will be trimmed.
*
* @param field the field for the configuration property
* @param regex the delimiting regular expression
* @return the list of string values; null only if there is no such key-value pair in the configuration
* @see String#split(String)
*/
default List<String> getTrimmedStrings(Field field, String regex) {
String value = getString(field);
if (value == null) {
return null;
}
return Arrays.stream(value.split(regex))
.map(String::trim)
.collect(Collectors.toList());
}
/**
* Get the integer value associated with the given key.
*

View File

@ -340,7 +340,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
.withImportance(Importance.MEDIUM)
.withDescription(" This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the"
+
"specific connectors . Select statements for the individual tables are " +
"specific connectors. Select statements for the individual tables are " +
"specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or "
+
"'snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]', respectively. " +
@ -700,7 +700,7 @@ private static int validateTableExcludeList(Configuration config, Field field, V
* Returns any SELECT overrides, if present.
*/
public Map<TableId, String> getSnapshotSelectOverridesByTable() {
List<String> tableValues = getConfig().getStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "\\s*,\\s*");
List<String> tableValues = getConfig().getTrimmedStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ",");
if (tableValues == null) {
return Collections.emptyMap();