DBZ-7301 Support Snapshotter for MariaDB

This commit is contained in:
mfvitale 2024-01-26 15:22:31 +01:00 committed by Jiri Pechanec
parent 1cdf2836dd
commit 0e68bc642d
5 changed files with 24 additions and 18 deletions

View File

@ -7,6 +7,11 @@
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.connector.mysql.strategy.mariadb.MariaDbConnection;
import io.debezium.connector.mysql.strategy.mariadb.MariaDbConnectorAdapter;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
public class BeanAwareSnapshotter implements BeanRegistryAware {
protected BeanRegistry beanRegistry;
@ -15,4 +20,14 @@ public class BeanAwareSnapshotter implements BeanRegistryAware {
public void injectBeanRegistry(BeanRegistry beanRegistry) {
this.beanRegistry = beanRegistry;
}
protected Class<? extends AbstractConnectorConnection> getConnectionClass(MySqlConnectorConfig config) {
// TODO review this when MariaDB becomes a first class connector
if (config.getConnectorAdapter() instanceof MariaDbConnectorAdapter) {
return MariaDbConnection.class;
}
else {
return MySqlConnection.class;
}
}
}

View File

@ -15,7 +15,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;
@ -36,8 +36,8 @@ public void configure(Map<String, ?> properties) {
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final MySqlConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, MySqlConnection.class);
final MySqlConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, MySqlConnectorConfig.class);
final AbstractConnectorConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, getConnectionClass(config));
final Offsets<MySqlPartition, MySqlOffsetContext> mySqloffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final MySqlOffsetContext offset = mySqloffsets.getTheOnlyOffset();

View File

@ -15,7 +15,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;
@ -36,8 +36,8 @@ public void configure(Map<String, ?> properties) {
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final MySqlConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, MySqlConnection.class);
final MySqlConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, MySqlConnectorConfig.class);
final AbstractConnectorConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, getConnectionClass(config));
final Offsets<MySqlPartition, MySqlOffsetContext> mySqloffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final MySqlOffsetContext offset = mySqloffsets.getTheOnlyOffset();

View File

@ -15,7 +15,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;
@ -36,8 +36,8 @@ public void configure(Map<String, ?> properties) {
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final MySqlConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, MySqlConnection.class);
final MySqlConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, MySqlConnectorConfig.class);
final AbstractConnectorConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, getConnectionClass(config));
final Offsets<MySqlPartition, MySqlOffsetContext> mySqloffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final MySqlOffsetContext offset = mySqloffsets.getTheOnlyOffset();

View File

@ -11,21 +11,17 @@
import org.slf4j.LoggerFactory;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;
public class WhenNeededSnapshotter implements Snapshotter, BeanRegistryAware {
public class WhenNeededSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
private static final Logger LOGGER = LoggerFactory.getLogger(WhenNeededSnapshotter.class);
private BeanRegistry beanRegistry;
@Override
public String name() {
return MySqlConnectorConfig.SnapshotMode.WHEN_NEEDED.getValue();
@ -36,16 +32,11 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void injectBeanRegistry(BeanRegistry beanRegistry) {
this.beanRegistry = beanRegistry;
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final MySqlConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, MySqlConnection.class);
final MySqlConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, MySqlConnectorConfig.class);
final AbstractConnectorConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, getConnectionClass(config));
final Offsets<MySqlPartition, MySqlOffsetContext> mySqloffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final MySqlOffsetContext offset = mySqloffsets.getTheOnlyOffset();