From e90196989be5efc3d45fdabbf2daf7928b2d1ada Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Fri, 30 Apr 2021 14:52:02 -0700 Subject: [PATCH] DBZ-3505: Better handling of invalid SQL Server connector configuration --- .../SqlServerSnapshotChangeEventSource.java | 9 ++++++++- .../sqlserver/SqlServerConnectorIT.java | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index efc38bf9f..d13389f42 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -253,9 +253,16 @@ protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotConte private String checkExcludedColumns(TableId tableId) { Table table = sqlServerDatabaseSchema.tableFor(tableId); - return table.retrieveColumnNames().stream() + List columnNames = table.retrieveColumnNames().stream() .filter(columnName -> filterChangeTableColumns(tableId, columnName)) .filter(columnName -> connectorConfig.getColumnFilter().matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) + .collect(Collectors.toList()); + + if (columnNames.isEmpty()) { + throw new IllegalArgumentException("Filtered column list for table " + tableId + " is empty"); + } + + return columnNames.stream() .map(columnName -> { StringBuilder sb = new StringBuilder(); if (!columnName.contains(tableId.table())) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 2b6511940..29ed11258 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -1221,6 +1221,25 @@ public void testColumnIncludeList() throws Exception { stopConnector(); } + @Test + @FixFor("DBZ-3505") + public void shouldFailOnInvalidColumnFilter() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".^") + .build(); + final LogInterceptor logInterceptor = new LogInterceptor(); + + start(SqlServerConnector.class, config); + waitForConnectorShutdown("sql_server", "server1"); + + consumeRecord(); + Awaitility.await() + .alias("Found error message in logs") + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .until(() -> logInterceptor.containsStacktraceElement("Filtered column list for table testDB.dbo.tablea is empty") + && !engine.isRunning()); + } + @Test @FixFor("DBZ-1692") public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception {