DBZ-3505: Better handling of invalid SQL Server connector configuration

This commit is contained in:
Sergei Morozov 2021-04-30 14:52:02 -07:00 committed by Jiri Pechanec
parent 7066ca7300
commit e90196989b
2 changed files with 27 additions and 1 deletions

View File

@ -253,9 +253,16 @@ protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotConte
private String checkExcludedColumns(TableId tableId) {
Table table = sqlServerDatabaseSchema.tableFor(tableId);
return table.retrieveColumnNames().stream()
List<String> columnNames = table.retrieveColumnNames().stream()
.filter(columnName -> filterChangeTableColumns(tableId, columnName))
.filter(columnName -> connectorConfig.getColumnFilter().matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName))
.collect(Collectors.toList());
if (columnNames.isEmpty()) {
throw new IllegalArgumentException("Filtered column list for table " + tableId + " is empty");
}
return columnNames.stream()
.map(columnName -> {
StringBuilder sb = new StringBuilder();
if (!columnName.contains(tableId.table())) {

View File

@ -1221,6 +1221,25 @@ public void testColumnIncludeList() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-3505")
public void shouldFailOnInvalidColumnFilter() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".^")
.build();
final LogInterceptor logInterceptor = new LogInterceptor();
start(SqlServerConnector.class, config);
waitForConnectorShutdown("sql_server", "server1");
consumeRecord();
Awaitility.await()
.alias("Found error message in logs")
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.until(() -> logInterceptor.containsStacktraceElement("Filtered column list for table testDB.dbo.tablea is empty")
&& !engine.isRunning());
}
@Test
@FixFor("DBZ-1692")
public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception {