DBZ-3622 Decouple database and schema history record writes

This commit is contained in:
Jiri Pechanec 2021-06-28 12:53:38 +02:00 committed by Gunnar Morling
parent 443c0ce089
commit 472b82abb7
6 changed files with 55 additions and 34 deletions

View File

@ -49,6 +49,7 @@
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
/**
* Component that records the schema history for databases hosted by a MySQL database server. The schema information includes
@ -79,6 +80,7 @@ public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private final DdlChanges ddlChanges;
private final Map<Long, TableId> tableIdsByTableNumber = new ConcurrentHashMap<>();
private boolean storageInitialiationExecuted = false;
private final MySqlConnectorConfig connectorConfig;
/**
* Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}.
@ -98,6 +100,7 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
this.ddlParser = new MySqlAntlrDdlParser(valueConverter, getTableFilter());
this.ddlChanges = this.ddlParser.getDdlChanges();
this.connectorConfig = connectorConfig;
filters = connectorConfig.getTableFilters();
}
@ -369,4 +372,13 @@ public void initializeStorage() {
public boolean isStorageInitializationExecuted() {
return storageInitialiationExecuted;
}
public boolean skipSchemaChangeEvent(SchemaChangeEvent event) {
if (!Strings.isNullOrEmpty(event.getDatabase())
&& !connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", event);
return true;
}
return false;
}
}

View File

@ -523,9 +523,7 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
throw new InterruptedException("Interrupted while processing event " + event);
}
if (databaseSchema.storeOnlyCapturedTables() && event.getDatabase() != null && event.getDatabase().length() != 0
&& !connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", event);
if (databaseSchema.skipSchemaChangeEvent(event)) {
continue;
}

View File

@ -572,6 +572,10 @@ else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandli
clock.currentTimeAsInstant());
try {
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
continue;
}
final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
eventDispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
try {

View File

@ -6,10 +6,7 @@
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Set;
@ -20,6 +17,7 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
@ -95,7 +93,7 @@ public void shouldApplyDdlStatementsAndRecover() throws InterruptedException {
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl("SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", null, offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.parseStreamingDdl(readFile("ddl/mysql-products.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-products.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
// Check that we have tables ...
@ -120,9 +118,9 @@ public void shouldIgnoreUnparseableDdlAndRecover() throws InterruptedException {
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl("SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", null, offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.parseStreamingDdl("xxxCREATE TABLE mytable\n" + readFile("ddl/mysql-products.ddl"), "db1", offset,
mysql.parseStreamingDdl("xxxCREATE TABLE mytable\n" + IoUtil.readClassPathResource("ddl/mysql-products.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.parseStreamingDdl(readFile("ddl/mysql-products.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-products.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
// Check that we have tables ...
@ -146,7 +144,7 @@ public void shouldFailOnUnparseableDdl() throws InterruptedException {
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl("SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", null, offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.parseStreamingDdl("xxxCREATE TABLE mytable\n" + readFile("ddl/mysql-products.ddl"), "db1", offset,
mysql.parseStreamingDdl("xxxCREATE TABLE mytable\n" + IoUtil.readClassPathResource("ddl/mysql-products.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
}
@ -164,11 +162,11 @@ public void shouldLoadSystemAndNonSystemTablesAndConsumeOnlyFilteredDatabases()
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl("SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", null, offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.parseStreamingDdl(readFile("ddl/mysql-test-init-5.7.ddl"), "mysql", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-test-init-5.7.ddl"), "mysql", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
offset.setBinlogStartPoint("binlog-001", 1000);
mysql.parseStreamingDdl(readFile("ddl/mysql-products.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-products.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
// Check that we have tables ...
@ -196,11 +194,11 @@ public void shouldLoadSystemAndNonSystemTablesAndConsumeAllDatabases() throws In
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl("SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", null, offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.parseStreamingDdl(readFile("ddl/mysql-test-init-5.7.ddl"), "mysql", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-test-init-5.7.ddl"), "mysql", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
offset.setBinlogStartPoint("binlog-001", 1000);
mysql.parseStreamingDdl(readFile("ddl/mysql-products.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-products.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
// Check that we have tables ...
@ -225,7 +223,7 @@ public void shouldAllowDecimalPrecision() {
// Set up the server ...
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl(readFile("ddl/mysql-decimal-issue.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-decimal-issue.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
assertTableIncluded("connector_test.business_order");
@ -234,6 +232,7 @@ public void shouldAllowDecimalPrecision() {
}
@Test
@FixFor("DBZ-3622")
public void shouldStoreNonCapturedDatabase() {
// Testing.Print.enable();
final Configuration config = DATABASE.defaultConfig()
@ -246,7 +245,7 @@ public void shouldStoreNonCapturedDatabase() {
// Set up the server ...
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl(readFile("ddl/mysql-schema-captured.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-schema-captured.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
assertTableIncluded("captured.ct");
@ -263,6 +262,7 @@ public void shouldStoreNonCapturedDatabase() {
}
@Test
@FixFor("DBZ-3622")
public void shouldNotStoreNonCapturedDatabase() {
// Testing.Print.enable();
final Configuration config = DATABASE.defaultConfig()
@ -276,7 +276,7 @@ public void shouldNotStoreNonCapturedDatabase() {
// Set up the server ...
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl(readFile("ddl/mysql-schema-captured.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-schema-captured.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
assertTableIncluded("captured.ct");
@ -293,6 +293,7 @@ public void shouldNotStoreNonCapturedDatabase() {
}
@Test
@FixFor("DBZ-3622")
public void shouldStoreNonCapturedTable() {
// Testing.Print.enable();
final Configuration config = DATABASE.defaultConfigWithoutDatabaseFilter()
@ -305,7 +306,7 @@ public void shouldStoreNonCapturedTable() {
// Set up the server ...
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl(readFile("ddl/mysql-schema-captured.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-schema-captured.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
assertTableIncluded("captured.ct");
@ -322,6 +323,7 @@ public void shouldStoreNonCapturedTable() {
}
@Test
@FixFor("DBZ-3622")
public void shouldNotStoreNonCapturedTable() {
// Testing.Print.enable();
final Configuration config = DATABASE.defaultConfigWithoutDatabaseFilter()
@ -335,7 +337,7 @@ public void shouldNotStoreNonCapturedTable() {
// Set up the server ...
offset.setBinlogStartPoint("binlog-001", 400);
mysql.parseStreamingDdl(readFile("ddl/mysql-schema-captured.ddl"), "db1", offset,
mysql.parseStreamingDdl(IoUtil.readClassPathResource("ddl/mysql-schema-captured.ddl"), "db1", offset,
Instant.now()).forEach(x -> mysql.applySchemaChange(x));
assertTableIncluded("captured.ct");
@ -402,16 +404,4 @@ protected void assertHistoryRecorded(Configuration config, OffsetContext offset)
protected void printStatements(String dbName, Set<TableId> tables, String ddlStatements) {
Testing.print("Running DDL for '" + dbName + "': " + ddlStatements + " changing tables '" + tables + "'");
}
protected String readFile(String classpathResource) {
try (InputStream stream = getClass().getClassLoader().getResourceAsStream(classpathResource);) {
assertThat(stream).isNotNull();
return IoUtil.read(stream);
}
catch (IOException e) {
fail("Unable to read '" + classpathResource + "'");
}
assert false : "should never get here";
return null;
}
}

View File

@ -468,9 +468,9 @@ public static void setLogFilesForMining(OracleConnection connection, Scn lastPro
List<LogFile> logFilesForMining = getLogFilesForOffsetScn(connection, lastProcessedScn, archiveLogRetention, archiveLogOnlyMode, archiveDestinationName);
if (!logFilesForMining.stream().anyMatch(l -> l.getFirstScn().compareTo(lastProcessedScn) <= 0)) {
Scn minScn = logFilesForMining.stream()
.map(LogFile::getFirstScn)
.min(Scn::compareTo)
.orElse(Scn.NULL);
.map(LogFile::getFirstScn)
.min(Scn::compareTo)
.orElse(Scn.NULL);
if ((minScn.isNull() || logFilesForMining.isEmpty()) && archiveLogOnlyMode) {
throw new DebeziumException("The log.mining.archive.log.only mode was recently enabled and the offset SCN " +

View File

@ -31,6 +31,7 @@
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
@ -39,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
/**
@ -521,6 +523,21 @@ public static Properties loadProperties(Class<?> clazz, String classpathResource
return loadProperties(clazz::getClassLoader, classpathResource);
}
/**
* Read a resource on classpath as a String
* @param classpathResource
* @return the content of resource as String
*/
public static String readClassPathResource(String classpathResource) {
try (InputStream stream = IoUtil.class.getClassLoader().getResourceAsStream(classpathResource)) {
Objects.requireNonNull(stream);
return IoUtil.read(stream);
}
catch (IOException e) {
throw new DebeziumException("Unable to read '" + classpathResource + "'");
}
}
private IoUtil() {
// Prevent construction
}