DBZ-7301 Implement SnapshotQuery for MySQL connector

This commit is contained in:
mfvitale 2024-01-30 16:11:04 +01:00 committed by Jiri Pechanec
parent 0e68bc642d
commit 6cb01f67d1
6 changed files with 205 additions and 8 deletions

View File

@ -543,6 +543,60 @@ public static ConnectorAdapterMode parse(String value, String defaultValue) {
}
}
public enum SnapshotQueryMode implements EnumeratedValue {
/**
* This mode will do a select based on {@code column.include.list} and {@code column.exclude.list} configurations.
*/
SELECT_ALL("select_all"),
CUSTOM("custom");
private final String value;
SnapshotQueryMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be {@code null}
* @return the matching option, or null if no match is found
*/
public static SnapshotQueryMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotQueryMode option : SnapshotQueryMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be {@code null}
* @param defaultValue the default value; may be {@code null}
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotQueryMode parse(String value, String defaultValue) {
SnapshotQueryMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
/**
* {@link Integer#MIN_VALUE Minimum value} used for fetch size hint.
* See <a href="https://issues.jboss.org/browse/DBZ-94">DBZ-94</a> for details.
@ -836,6 +890,31 @@ public static ConnectorAdapterMode parse(String value, String defaultValue) {
+ "'schema_only_recovery' and is only safe to use if no schema changes are happening while the snapshot is taken.")
.withValidation(MySqlConnectorConfig::validateSnapshotLockingMode);
public static final Field SNAPSHOT_QUERY_MODE = Field.create("snapshot.query.mode")
.withDisplayName("Snapshot query mode")
.withEnum(SnapshotQueryMode.class, SnapshotQueryMode.SELECT_ALL)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 13))
.withDescription("Controls query used during the snapshot");
public static final Field SNAPSHOT_QUERY_MODE_CUSTOM_NAME = Field.create("snapshot.query.mode.custom.name")
.withDisplayName("Snapshot Query Mode Custom Name")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 14))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if (config.getString(SNAPSHOT_QUERY_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.query.mode.custom.name cannot be empty when snapshot.query.mode 'custom' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.query.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. "
+ "The implementations must implement the 'SnapshotterQuery' interface and is called to determine how to build queries during snapshot.");
public static final Field SNAPSHOT_NEW_TABLES = Field.create("snapshot.new.tables")
.withDisplayName("Snapshot newly added tables")
.withEnum(SnapshotNewTables.class, SnapshotNewTables.OFF)
@ -955,6 +1034,8 @@ public static ConnectorAdapterMode parse(String value, String defaultValue) {
KEEP_ALIVE_INTERVAL_MS,
SNAPSHOT_MODE,
SNAPSHOT_LOCKING_MODE,
SNAPSHOT_QUERY_MODE,
SNAPSHOT_QUERY_MODE_CUSTOM_NAME,
SNAPSHOT_NEW_TABLES,
BIGINT_UNSIGNED_HANDLING_MODE,
TIME_PRECISION_MODE,
@ -1006,6 +1087,8 @@ protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
private final Configuration config;
private final SnapshotMode snapshotMode;
private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotQueryMode snapshotQueryMode;
private final String snapshotQueryModeCustomName;
private final SnapshotNewTables snapshotNewTables;
private final TemporalPrecisionMode temporalPrecisionMode;
private final Duration connectionTimeout;
@ -1028,6 +1111,8 @@ public MySqlConnectorConfig(Configuration config) {
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.snapshotQueryMode = SnapshotQueryMode.parse(config.getString(SNAPSHOT_QUERY_MODE), SNAPSHOT_QUERY_MODE.defaultValueAsString());
this.snapshotQueryModeCustomName = config.getString(SNAPSHOT_QUERY_MODE_CUSTOM_NAME, "");
this.readOnlyConnection = config.getBoolean(READ_ONLY_CONNECTION);
final String snapshotNewTables = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
@ -1058,8 +1143,12 @@ public SnapshotLockingMode getSnapshotLockingMode() {
return this.snapshotLockingMode;
}
public SnapshotNewTables getSnapshotNewTables() {
return snapshotNewTables;
public SnapshotQueryMode snapshotQueryMode() {
return this.snapshotQueryMode;
}
public String snapshotQueryModeCustomName() {
return this.snapshotQueryModeCustomName;
}
private static int validateEventDeserializationFailureHandlingModeNotSet(Configuration config, Field field, ValidationOutput problems) {

View File

@ -22,6 +22,7 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.snapshot.MySqlSnapshotQueryProvider;
import io.debezium.connector.mysql.snapshot.MySqlSnapshotterServiceProvider;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.connector.mysql.strategy.ConnectorAdapter;
@ -247,7 +248,7 @@ protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
super.registerServiceProviders(serviceRegistry);
// serviceRegistry.registerServiceProvider(new SnapshotLockProvider());
// serviceRegistry.registerServiceProvider(new SnapshotQueryProvider());
serviceRegistry.registerServiceProvider(new MySqlSnapshotQueryProvider());
serviceRegistry.registerServiceProvider(new MySqlSnapshotterServiceProvider());
}

View File

@ -481,17 +481,20 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<MySqlP
@Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
TableId tableId, List<String> columns) {
return Optional.of(getSnapshotSelect(tableId, columns));
return getSnapshotSelect(tableId, columns);
}
private String getSnapshotSelect(TableId tableId, List<String> columns) {
String snapshotSelectColumns = String.join(", ", columns);
return String.format("SELECT %s FROM `%s`.`%s`", snapshotSelectColumns, tableId.catalog(), tableId.table());
private Optional<String> getSnapshotSelect(TableId tableId, List<String> columns) {
return snapshotterService.getSnapshotQuery().snapshotQuery(tableId.toQuotedString('`'), columns);
}
@Override
protected Optional<String> getSnapshotConnectionFirstSelect(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, TableId tableId) {
return Optional.of(getSnapshotSelect(tableId, List.of("*")) + " LIMIT 1");
if (getSnapshotSelect(tableId, List.of("*")).isEmpty()) {
return Optional.empty();
}
return Optional.of(getSnapshotSelect(tableId, List.of("*")).get() + " LIMIT 1");
}
private boolean isGloballyLocked() {

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.snapshot;
import static io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotQueryMode.CUSTOM;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.StreamSupport;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotQueryMode;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotQuery;
/**
* An implementation of the {@link ServiceProvider} contract for the {@link SnapshotQuery}.
*
* @author Mario Fiore Vitale
*/
public class MySqlSnapshotQueryProvider implements ServiceProvider<SnapshotQuery> {
@Override
public SnapshotQuery createService(Configuration configuration, ServiceRegistry serviceRegistry) {
BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class);
MySqlConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, MySqlConnectorConfig.class);
final SnapshotQueryMode configuredSnapshotQueryMode = postgresConnectorConfig.snapshotQueryMode();
final String snapshotQueryModeCustomName = postgresConnectorConfig.snapshotQueryModeCustomName();
String snapshotQueryMode;
if (CUSTOM.equals(configuredSnapshotQueryMode) && !snapshotQueryModeCustomName.isEmpty()) {
snapshotQueryMode = snapshotQueryModeCustomName;
}
else {
snapshotQueryMode = configuredSnapshotQueryMode.getValue();
}
Optional<SnapshotQuery> snapshotQuery = StreamSupport.stream(ServiceLoader.load(SnapshotQuery.class).spliterator(), false)
.filter(s -> s.name().equals(snapshotQueryMode))
.findAny();
return snapshotQuery.map(s -> {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
((BeanRegistryAware) s).injectBeanRegistry(beanRegistry);
}
return s;
})
.orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshot query mode. Please check your configuration.", snapshotQueryMode)));
}
@Override
public Class<SnapshotQuery> getServiceClass() {
return SnapshotQuery.class;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.snapshot.query;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.snapshot.spi.SnapshotQuery;
public class SelectAllSnapshotQuery implements SnapshotQuery {
@Override
public String name() {
return MySqlConnectorConfig.SnapshotQueryMode.SELECT_ALL.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {
return Optional.of(snapshotSelectColumns.stream()
.collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId)));
}
}

View File

@ -0,0 +1 @@
io.debezium.connector.mysql.snapshot.query.SelectAllSnapshotQuery