DBZ-2793 Add test for SQL server schema filters

This commit is contained in:
Vojtech Juranek 2022-03-24 16:55:12 +01:00 committed by Jiri Pechanec
parent cacd731f33
commit 581365e461

View File

@ -8,6 +8,8 @@
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertNull;
@ -55,6 +57,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
@ -2569,6 +2572,101 @@ public void shouldIncludeDatabaseNameIntoTopicAndSchemaNamesInMultiPartitionMode
assertThat(record.valueSchema().name()).isEqualTo("server1.testDB.dbo.tablea.Envelope");
}
@Test
@FixFor("DBZ-2793")
public void shouldApplySchemaFilters() throws Exception {
connection.setAutoCommit(false);
String statements = "DROP TABLE IF EXISTS s1.tablea;" +
"DROP TABLE IF EXISTS s1.tableb;" +
"DROP TABLE IF EXISTS s2.tablea;" +
"DROP TABLE IF EXISTS s2.tableb;";
connection.execute(statements);
connection.execute("DROP SCHEMA IF EXISTS s1");
connection.execute("DROP SCHEMA IF EXISTS s2");
connection.execute("CREATE SCHEMA s1");
connection.execute("CREATE SCHEMA s2");
statements = "CREATE TABLE s1.tablea (id int PRIMARY KEY, vala integer);" +
"CREATE TABLE s1.tableb (id int PRIMARY KEY, valb integer);" +
"CREATE TABLE s2.tablea (id int PRIMARY KEY, vala integer);" +
"CREATE TABLE s2.tableb (id int PRIMARY KEY, valb integer);";
connection.execute(statements);
connection.setAutoCommit(true);
TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s1", "tablea"));
TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s1", "tableb"));
TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s2", "tablea"));
TestHelper.enableSchemaTableCdc(connection, new TableId(null, "s2", "tableb"));
// Test exclude filter, s2 schema and default dbo schema should be included.
Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SCHEMA_EXCLUDE_LIST, "s1")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
TestHelper.waitForSnapshotToBeCompleted();
consumeRecordsByTopic(1);
connection.execute("INSERT INTO s1.tablea VALUES(1, 1)");
connection.execute("INSERT INTO s1.tableb VALUES(1, 2)");
connection.execute("INSERT INTO s2.tablea VALUES(1, 3)");
connection.execute("INSERT INTO s2.tableb VALUES(1, 4)");
connection.execute("INSERT INTO tablea VALUES(1001, 'a')");
connection.execute("INSERT INTO tableb VALUES(1001, 'b')");
SourceRecords records = consumeRecordsByTopic(4);
List<SourceRecord> tableS1A = records.recordsForTopic("server1.s1.tablea");
List<SourceRecord> tableS1B = records.recordsForTopic("server1.s1.tableb");
List<SourceRecord> tableS2A = records.recordsForTopic("server1.s2.tablea");
List<SourceRecord> tableS2B = records.recordsForTopic("server1.s2.tableb");
List<SourceRecord> tableDboA = records.recordsForTopic("server1.dbo.tablea");
List<SourceRecord> tableDboB = records.recordsForTopic("server1.dbo.tableb");
assertNull(tableS1A);
assertNull(tableS1B);
Assertions.assertThat(tableS2A).hasSize(1);
Assertions.assertThat(tableS2B).hasSize(1);
Assertions.assertThat(tableDboA).hasSize(1);
Assertions.assertThat(tableDboB).hasSize(1);
stopConnector();
// Test include filter, only s1 schema should be included.
config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SCHEMA_INCLUDE_LIST, "s1")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.execute("INSERT INTO s2.tablea VALUES(2, 3)");
connection.execute("INSERT INTO s2.tableb VALUES(2, 4)");
connection.execute("INSERT INTO tablea VALUES(1002, 'a')");
connection.execute("INSERT INTO tableb VALUES(1002, 'a')");
connection.execute("INSERT INTO s1.tablea VALUES(2, 1)");
connection.execute("INSERT INTO s1.tableb VALUES(2, 2)");
records = consumeRecordsByTopic(2);
tableS2A = records.recordsForTopic("server1.s2.tablea");
tableS2B = records.recordsForTopic("server1.s2.tableb");
tableDboA = records.recordsForTopic("server1.dbo.tablea");
tableDboB = records.recordsForTopic("server1.dbo.tableb");
tableS1A = records.recordsForTopic("server1.s1.tablea");
tableS1B = records.recordsForTopic("server1.s1.tableb");
Assertions.assertThat(tableS1A).hasSize(1);
Assertions.assertThat(tableS1B).hasSize(1);
assertNull(tableS2A);
assertNull(tableS2B);
assertNull(tableDboA);
assertNull(tableDboB);
stopConnector();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}