diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 29ab283ed..9d883a3b3 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -8,6 +8,8 @@ import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY; import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY; import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY; +import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST; +import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.MapAssert.entry; import static org.junit.Assert.assertNull; @@ -55,6 +57,7 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.RelationalSnapshotChangeEventSource; +import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.DatabaseHistory; @@ -2569,6 +2572,101 @@ public void shouldIncludeDatabaseNameIntoTopicAndSchemaNamesInMultiPartitionMode assertThat(record.valueSchema().name()).isEqualTo("server1.testDB.dbo.tablea.Envelope"); } + @Test + @FixFor("DBZ-2793") + public void shouldApplySchemaFilters() throws Exception { + connection.setAutoCommit(false); + String statements = "DROP TABLE IF EXISTS s1.tablea;" + + "DROP TABLE IF EXISTS s1.tableb;" + + "DROP TABLE IF EXISTS s2.tablea;" + + "DROP TABLE IF EXISTS s2.tableb;"; + connection.execute(statements); + connection.execute("DROP SCHEMA IF EXISTS s1"); + connection.execute("DROP SCHEMA IF EXISTS s2"); + connection.execute("CREATE SCHEMA s1"); + connection.execute("CREATE SCHEMA s2"); + statements = "CREATE TABLE s1.tablea (id int PRIMARY KEY, vala integer);" + + "CREATE TABLE s1.tableb (id int PRIMARY KEY, valb integer);" + + "CREATE TABLE s2.tablea (id int PRIMARY KEY, vala integer);" + + "CREATE TABLE s2.tableb (id int PRIMARY KEY, valb integer);"; + connection.execute(statements); + connection.setAutoCommit(true); + TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s1", "tablea")); + TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s1", "tableb")); + TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s2", "tablea")); + TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s2", "tableb")); + + // Test exclude filter, s2 schema and default dbo schema should be included. + Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SCHEMA_EXCLUDE_LIST, "s1") + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + TestHelper.waitForSnapshotToBeCompleted(); + consumeRecordsByTopic(1); + + connection.execute("INSERT INTO s1.tablea VALUES(1, 1)"); + connection.execute("INSERT INTO s1.tableb VALUES(1, 2)"); + connection.execute("INSERT INTO s2.tablea VALUES(1, 3)"); + connection.execute("INSERT INTO s2.tableb VALUES(1, 4)"); + connection.execute("INSERT INTO tablea VALUES(1001, 'a')"); + connection.execute("INSERT INTO tableb VALUES(1001, 'b')"); + + SourceRecords records = consumeRecordsByTopic(4); + List tableS1A = records.recordsForTopic("server1.s1.tablea"); + List tableS1B = records.recordsForTopic("server1.s1.tableb"); + List tableS2A = records.recordsForTopic("server1.s2.tablea"); + List tableS2B = records.recordsForTopic("server1.s2.tableb"); + List tableDboA = records.recordsForTopic("server1.dbo.tablea"); + List tableDboB = records.recordsForTopic("server1.dbo.tableb"); + + assertNull(tableS1A); + assertNull(tableS1B); + Assertions.assertThat(tableS2A).hasSize(1); + Assertions.assertThat(tableS2B).hasSize(1); + Assertions.assertThat(tableDboA).hasSize(1); + Assertions.assertThat(tableDboB).hasSize(1); + + stopConnector(); + + // Test include filter, only s1 schema should be included. + config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SCHEMA_INCLUDE_LIST, "s1") + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + connection.execute("INSERT INTO s2.tablea VALUES(2, 3)"); + connection.execute("INSERT INTO s2.tableb VALUES(2, 4)"); + connection.execute("INSERT INTO tablea VALUES(1002, 'a')"); + connection.execute("INSERT INTO tableb VALUES(1002, 'a')"); + connection.execute("INSERT INTO s1.tablea VALUES(2, 1)"); + connection.execute("INSERT INTO s1.tableb VALUES(2, 2)"); + + records = consumeRecordsByTopic(2); + tableS2A = records.recordsForTopic("server1.s2.tablea"); + tableS2B = records.recordsForTopic("server1.s2.tableb"); + tableDboA = records.recordsForTopic("server1.dbo.tablea"); + tableDboB = records.recordsForTopic("server1.dbo.tableb"); + tableS1A = records.recordsForTopic("server1.s1.tablea"); + tableS1B = records.recordsForTopic("server1.s1.tableb"); + + Assertions.assertThat(tableS1A).hasSize(1); + Assertions.assertThat(tableS1B).hasSize(1); + assertNull(tableS2A); + assertNull(tableS2B); + assertNull(tableDboA); + assertNull(tableDboB); + + stopConnector(); + } + private void assertRecord(Struct record, List expected) { expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); }