diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index efc38bf9f..d13389f42 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -253,9 +253,16 @@ protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotConte private String checkExcludedColumns(TableId tableId) { Table table = sqlServerDatabaseSchema.tableFor(tableId); - return table.retrieveColumnNames().stream() + List columnNames = table.retrieveColumnNames().stream() .filter(columnName -> filterChangeTableColumns(tableId, columnName)) .filter(columnName -> connectorConfig.getColumnFilter().matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) + .collect(Collectors.toList()); + + if (columnNames.isEmpty()) { + throw new IllegalArgumentException("Filtered column list for table " + tableId + " is empty"); + } + + return columnNames.stream() .map(columnName -> { StringBuilder sb = new StringBuilder(); if (!columnName.contains(tableId.table())) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 2b6511940..29ed11258 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -1221,6 +1221,25 @@ public void testColumnIncludeList() throws Exception { stopConnector(); } + @Test + @FixFor("DBZ-3505") + public void shouldFailOnInvalidColumnFilter() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".^") + .build(); + final LogInterceptor logInterceptor = new LogInterceptor(); + + start(SqlServerConnector.class, config); + waitForConnectorShutdown("sql_server", "server1"); + + consumeRecord(); + Awaitility.await() + .alias("Found error message in logs") + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .until(() -> logInterceptor.containsStacktraceElement("Filtered column list for table testDB.dbo.tablea is empty") + && !engine.isRunning()); + } + @Test @FixFor("DBZ-1692") public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception {