DBZ-683 Snapshot supports storing only whitelisted tables setting

This commit is contained in:
Jiri Pechanec 2019-05-03 10:46:42 +02:00 committed by Gunnar Morling
parent 1603ebda89
commit 540196091c
12 changed files with 147 additions and 31 deletions

View File

@ -315,7 +315,7 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here
// (which writes to the public schema change topic); if required, a second option could be added
// for controlling this, too
if (!storeOnlyMonitoredTablesDdl || !changes.isEmpty()) {
if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) {
if (statementConsumer != null) {
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
@ -349,17 +349,13 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the
// schema change records so that failure recovery (which is based on of the history) won't lose
// schema change records.
try {
if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) {
dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements);
} else {
logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements);
}
} catch (Throwable e) {
throw new ConnectException(
"Error recording the DDL statement(s) in the database history " + dbHistory + ": " + ddlStatements, e);
if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) {
dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements);
}
}
else {
logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements);
}
}
// Figure out what changed ...
@ -374,4 +370,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
});
return true;
}
/**
* @return true if only monitored tables should be stored in database history, false if all tables should be stored
*/
public boolean isStoreOnlyMonitoredTablesDdl() {
return storeOnlyMonitoredTablesDdl;
}
}

View File

@ -302,12 +302,12 @@ protected void execute() {
mysql.query(sql.get(), rs -> {
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
if (createTableFilters.tableFilter().test(id)) {
if ((createTableFilters == filters && shouldRecordTableSchema(schema, filters, id)) || createTableFilters.tableFilter().test(id)) {
createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
}
if (filters.tableFilter().test(id)) {
if (shouldRecordTableSchema(schema, filters, id)) {
tableIds.add(id);
logger.info("\t including '{}'", id);
logger.info("\t including '{}' for further processing", id);
} else {
logger.info("\t '{}' is filtered out, discarding", id);
}
@ -481,6 +481,10 @@ protected void execute() {
Iterator<TableId> tableIdIter = tableIds.iterator();
while (tableIdIter.hasNext()) {
TableId tableId = tableIdIter.next();
if (!filters.tableFilter().test(tableId)) {
// Table schema was recorded but the table is filtered out so will not be snapshotted
continue;
}
AtomicLong rowNum = new AtomicLong();
if (!isRunning()) {
break;
@ -730,6 +734,10 @@ protected void execute() {
}
}
private boolean shouldRecordTableSchema(final MySqlSchema schema, final Filters filters, TableId id) {
return !schema.isStoreOnlyMonitoredTablesDdl() || filters.tableFilter().test(id);
}
protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection mysql, AtomicReference<String> sql) throws SQLException {
if (context.isSchemaOnlyRecoverySnapshot()) {
// We are in schema only recovery mode, use the existing binlog position

View File

@ -35,6 +35,9 @@ public void enterUseStatement(MySqlParser.UseStatementContext ctx) {
// right for the current database.
String charsetForDb = parser.charsetNameForDatabase().get(dbName);
parser.systemVariables().setVariable(MySqlSystemVariables.MySqlScope.SESSION, MySqlSystemVariables.CHARSET_NAME_DATABASE, charsetForDb);
// Signal that the variable was set ...
parser.signalUseDatabase(ctx);
super.enterUseStatement(ctx);
}
}

View File

@ -1385,7 +1385,7 @@ public void shouldParseStatementsWithQuotedIdentifiers() {
parser.parse(readFile("ddl/mysql-quoted.ddl"), tables);
Testing.print(tables);
assertThat(tables.size()).isEqualTo(4);
assertThat(listener.total()).isEqualTo(10);
assertThat(listener.total()).isEqualTo(11);
assertThat(tables.forTable("connector_test_ro", null, "products")).isNotNull();
assertThat(tables.forTable("connector_test_ro", null, "products_on_hand")).isNotNull();
assertThat(tables.forTable("connector_test_ro", null, "customers")).isNotNull();
@ -1397,7 +1397,7 @@ public void shouldParseIntegrationTestSchema() {
parser.parse(readFile("ddl/mysql-integration.ddl"), tables);
Testing.print(tables);
assertThat(tables.size()).isEqualTo(10);
assertThat(listener.total()).isEqualTo(17);
assertThat(listener.total()).isEqualTo(20);
}
@Test
@ -1690,7 +1690,7 @@ public void shouldParseAlterTableThatChangesMultipleColumns() {
public void shouldParseTicketMonsterLiquibaseStatements() {
parser.parse(readLines(1, "ddl/mysql-ticketmonster-liquibase.ddl"), tables);
assertThat(tables.size()).isEqualTo(7);
assertThat(listener.total()).isEqualTo(16);
assertThat(listener.total()).isEqualTo(17);
listener.forEach(this::printEvent);
}

View File

@ -699,7 +699,8 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc
.with(MySqlConnectorConfig.DATABASE_WHITELIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.getDatabaseName() + ".products")
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, DATABASE.getDatabaseName()+".products")
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE+"."+DATABASE.getDatabaseName()+".products", String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName()))
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + DATABASE.getDatabaseName()+".products", String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName()))
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
@ -747,6 +748,7 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, tables)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, tables)
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE+"."+DATABASE.getDatabaseName()+".products", String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName()))
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE+"."+DATABASE.getDatabaseName()+".products_on_hand", String.format("SELECT * from %s.products_on_hand where product_id>=108 order by product_id", DATABASE.getDatabaseName()))
@ -796,8 +798,8 @@ public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throw
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(2);
SourceRecords records = consumeRecordsByTopic(1 + 5);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(5);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
@ -843,12 +845,12 @@ public void shouldProcessCreateUniqueIndex() throws SQLException, InterruptedExc
}
}
SourceRecords records = consumeRecordsByTopic(8);
SourceRecords records = consumeRecordsByTopic(16);
final List<SourceRecord> migrationTestRecords = records.recordsForTopic(DATABASE.topicForTable("migration_test"));
assertThat(migrationTestRecords.size()).isEqualTo(1);
final SourceRecord record = migrationTestRecords.get(0);
assertThat(((Struct) record.key()).getString("mgb_no")).isEqualTo("2");
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(6);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(14);
stopConnector();
}
@ -865,13 +867,15 @@ public void shouldIgnoreAlterTableForNonCapturedTablesStoredInHistory() throws S
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
dropDatabases();
// Start the connector ...
start(MySqlConnector.class, config);
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(6);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(5);
SourceRecords records = consumeRecordsByTopic(1 + 1 + 2 + 2 * 4);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(1 + 2 + 2 * 4);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
@ -914,8 +918,8 @@ public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() thro
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(2);
SourceRecords records = consumeRecordsByTopic(6);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(5);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
@ -930,6 +934,57 @@ public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() thro
Assertions.assertThat(record.topic()).isEqualTo(DATABASE.topicForTable("customers"));
}
@Test
@FixFor("DBZ-683")
public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLException, InterruptedException {
Testing.Files.delete(DB_HISTORY_PATH);
final String tables = String.format("%s.customers,%s.orders", DATABASE.getDatabaseName(), DATABASE.getDatabaseName());
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.TABLE_WHITELIST, tables)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, ".*")
.build();
dropDatabases();
try (MySQLConnection db = MySQLConnection.forTestDatabase("mysql");) {
try (JdbcConnection connection = db.connect()) {
connection.execute(
"CREATE DATABASE non_wh",
"USE non_wh",
"CREATE TABLE t1 (ID INT PRIMARY KEY)"
);
}
}
// Start the connector ...
start(MySqlConnector.class, config);
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
// Four tables in database, only two are whitelisted
SourceRecords records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 3 + 2);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(1 + 2 + 2 * 4);
stopConnector();
}
private void dropDatabases() throws SQLException {
try (MySQLConnection db = MySQLConnection.forTestDatabase("mysql");) {
try (JdbcConnection connection = db.connect()) {
connection.query("SHOW DATABASES", rs -> {
while (rs.next()) {
final String dbName = rs.getString(1);
if (!Filters.isBuiltInDatabase(dbName) && !dbName.equals(DATABASE.getDatabaseName())) {
connection.execute("DROP DATABASE IF EXISTS " + dbName);
}
}
});
}
}
}
protected static class BinlogPosition {
private String binlogFilename;
private long binlogPosition;

View File

@ -38,6 +38,7 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
@ -860,6 +861,7 @@ public void shouldConsumeDecimalAsStringFromSnapshot() throws SQLException, Inte
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("dbz_147_decimalvalues"))
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.build();
// Start the connector ...

View File

@ -43,7 +43,7 @@ public class MySqlGeometryIT extends AbstractConnectorTest {
@Before
public void beforeEach() {
stopConnector();
databaseDifferences = databaseGeoDifferences(MySQLConnection.forTestDatabase("emptydb").getMySqlVersion());
databaseDifferences = databaseGeoDifferences(MySQLConnection.forTestDatabase("mysql").getMySqlVersion());
DATABASE = new UniqueDatabase("geometryit", databaseDifferences.geometryDatabaseName())
.withDbHistoryPath(DB_HISTORY_PATH);

View File

@ -36,6 +36,7 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
@ -517,6 +518,7 @@ public void dateAndTimeTest() throws InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("DATE_TIME_TABLE"))
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.build();
start(MySqlConnector.class, config);
@ -582,6 +584,7 @@ public void timeTypeWithConnectMode() throws InterruptedException {
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("DATE_TIME_TABLE"))
.with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.build();
start(MySqlConnector.class, config);

View File

@ -125,7 +125,11 @@ protected void signalChangeEvent(DdlParserListener.Event event) {
}
protected void signalSetVariable(String variableName, String variableValue, String statement) {
signalChangeEvent(new DdlParserListener.SetVariableEvent(variableName, variableValue, statement));
signalChangeEvent(new DdlParserListener.SetVariableEvent(variableName, variableValue, currentSchema, statement));
}
protected void signalUseDatabase(String statement) {
signalChangeEvent(new DdlParserListener.DatabaseSwitchedEvent(currentSchema, statement));
}
/**

View File

@ -9,8 +9,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.TableId;
/**
* A {@link DdlParserListener} that accumulates changes, allowing them to be consumed in the same order by database.
@ -129,10 +131,12 @@ protected String getDatabase(Event event) {
case CREATE_DATABASE:
case ALTER_DATABASE:
case DROP_DATABASE:
case USE_DATABASE:
DatabaseEvent dbEvent = (DatabaseEvent) event;
return dbEvent.databaseName();
case SET_VARIABLE:
return "";
SetVariableEvent varEvent = (SetVariableEvent) event;
return varEvent.databaseName().orElse("");
}
assert false : "Should never happen";
return null;
@ -162,4 +166,15 @@ public static interface DatabaseStatementConsumer {
public static interface DatabaseStatementStringConsumer {
void consume(String databaseName, String ddlStatements);
}
public boolean anyMatch(Predicate<String> databaseFilter, Predicate<TableId> tableFilter) {
return events.stream().anyMatch(event ->
(event instanceof DatabaseEvent) && databaseFilter.test(((DatabaseEvent) event).databaseName())
|| (event instanceof TableEvent) && tableFilter.test(((TableEvent) event).tableId())
|| (event instanceof SetVariableEvent) && (
!((SetVariableEvent) event).databaseName().isPresent()
|| databaseFilter.test(((SetVariableEvent) event).databaseName().get())
)
);
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.relational.ddl;
import java.util.Optional;
import io.debezium.annotation.Immutable;
import io.debezium.relational.TableId;
@ -33,7 +35,7 @@ public interface DdlParserListener {
public static enum EventType {
CREATE_TABLE, ALTER_TABLE, DROP_TABLE, TRUNCATE_TABLE,
CREATE_INDEX, DROP_INDEX,
CREATE_DATABASE, ALTER_DATABASE, DROP_DATABASE,
CREATE_DATABASE, ALTER_DATABASE, DROP_DATABASE, USE_DATABASE,
SET_VARIABLE,
}
@ -293,6 +295,16 @@ public DatabaseDroppedEvent(String databaseName, String ddlStatement) {
}
}
/**
* An event describing the switching of a database.
*/
@Immutable
public static class DatabaseSwitchedEvent extends DatabaseEvent {
public DatabaseSwitchedEvent(String databaseName, String ddlStatement) {
super(EventType.USE_DATABASE, databaseName, ddlStatement);
}
}
/**
* An event describing the setting of a variable.
*/
@ -301,11 +313,13 @@ public static class SetVariableEvent extends Event {
private final String variableName;
private final String value;
private final String databaseName;
public SetVariableEvent(String variableName, String value, String ddlStatement) {
public SetVariableEvent(String variableName, String value, String currentDatabaseName, String ddlStatement) {
super(EventType.SET_VARIABLE, ddlStatement);
this.variableName = variableName;
this.value = value;
this.databaseName = currentDatabaseName;
}
/**
@ -324,6 +338,10 @@ public String variableValue() {
return value;
}
public Optional<String> databaseName() {
return Optional.ofNullable(databaseName);
}
@Override
public String toString() {
return statement();

View File

@ -197,6 +197,10 @@ public void signalSetVariable(String variableName, String variableValue, ParserR
signalSetVariable(variableName, variableValue, getText(ctx));
}
public void signalUseDatabase(ParserRuleContext ctx) {
signalUseDatabase(getText(ctx));
}
/**
* Signal a create database event to ddl changes listener.
*