DBZ-7732 Add ConnectorSpecific annotation to manage multiple SPI connector specific implementation

This commit is contained in:
mfvitale 2024-04-05 11:43:11 +02:00 committed by Jiri Pechanec
parent 4ca0e7ffb9
commit ae7628a732
23 changed files with 694 additions and 15 deletions

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.snapshot.spi.SnapshotQuery;
@ConnectorSpecific(connector = MongoDbConnector.class)
public class SelectAllSnapshotQuery implements SnapshotQuery {
@Override

View File

@ -8,6 +8,10 @@
import java.time.Duration;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.mysql.MySqlConnector;
@ConnectorSpecific(connector = MySqlConnector.class)
public abstract class DefaultSnapshotLock {
public Optional<String> tableLockingStatement(Duration lockTimeout, String tableId) {
return Optional.of("FLUSH TABLES WITH READ LOCK");

View File

@ -7,9 +7,12 @@
import java.util.Map;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = MySqlConnector.class)
public class ExtendedSnapshotLock extends DefaultSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = MySqlConnector.class)
public class MinimalPerconaSnapshotLock implements SnapshotLock {
@Override

View File

@ -7,9 +7,12 @@
import java.util.Map;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = MySqlConnector.class)
public class MinimalSnapshotLock extends DefaultSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = MySqlConnector.class)
public class NoneSnapshotLock implements SnapshotLock {
@Override

View File

@ -10,9 +10,12 @@
import java.util.Optional;
import java.util.stream.Collectors;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.snapshot.spi.SnapshotQuery;
@ConnectorSpecific(connector = MySqlConnector.class)
public class SelectAllSnapshotQuery implements SnapshotQuery {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = OracleConnector.class)
public class NoSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = OracleConnector.class)
public class SharedSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,14 +9,17 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.snapshot.spi.SnapshotQuery;
@ConnectorSpecific(connector = OracleConnector.class)
public class SelectAllSnapshotQuery implements SnapshotQuery, BeanRegistryAware {
private BeanRegistry beanRegistry;

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = PostgresConnector.class)
public class NoSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = PostgresConnector.class)
public class SharedSnapshotLock implements SnapshotLock {
@Override

View File

@ -10,9 +10,12 @@
import java.util.Optional;
import java.util.stream.Collectors;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.snapshot.spi.SnapshotQuery;
@ConnectorSpecific(connector = PostgresConnector.class)
public class SelectAllSnapshotQuery implements SnapshotQuery {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = SqlServerConnector.class)
public class ExclusiveSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
@ConnectorSpecific(connector = SqlServerConnector.class)
public class NoSnapshotLock implements SnapshotLock {
@Override

View File

@ -9,9 +9,12 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.snapshot.spi.SnapshotQuery;
@ConnectorSpecific(connector = SqlServerConnector.class)
public class SelectAllSnapshotQuery implements SnapshotQuery {
@Override

View File

@ -0,0 +1,30 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import io.debezium.connector.common.BaseSourceConnector;
/**
* Marks a class to be connector specific SPI implementation.
* <p>
* Used in combination with SPI loading mechanism where different implementation, of same interface,
* is provided by different connectors.
* Marking an implementation with this annotation will instruct {@link io.debezium.snapshot.SnapshotLockProvider} and {@link io.debezium.snapshot.SnapshotQueryProvider}
* which implementation to load.
*/
@Documented
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ConnectorSpecific {
Class<? extends BaseSourceConnector> connector();
}

View File

@ -1528,7 +1528,7 @@ public boolean snapshotModeConfigurationBasedSnapshotOnDataError() {
return this.snapshotModeConfigurationBasedSnapshotOnDataError;
}
public SnapshotQueryMode snapshotQueryMode() {
public EnumeratedValue snapshotQueryMode() {
return this.snapshotQueryMode;
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.snapshot;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.config.Configuration;
import io.debezium.connector.common.BaseSourceConnector;
public class AbstractSnapshotProvider {
protected boolean isForCurrentConnector(Configuration configuration, Class<?> implementationClass) {
ConnectorSpecific annotation = implementationClass.getAnnotation(ConnectorSpecific.class);
if (annotation == null) {
return false;
}
Class<? extends BaseSourceConnector> connectorClass = annotation.connector();
return connectorClass == getConnectorClass(configuration);
}
private Class<?> getConnectorClass(Configuration config) {
try {
return Class.forName(config.getString("connector.class"));
}
catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -5,10 +5,16 @@
*/
package io.debezium.snapshot;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
@ -25,7 +31,22 @@
*
* @author Mario Fiore Vitale
*/
public class SnapshotLockProvider implements ServiceProvider<SnapshotLock> {
public class SnapshotLockProvider extends AbstractSnapshotProvider implements ServiceProvider<SnapshotLock> {
private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotLockProvider.class);
final List<SnapshotLock> snapshotLockImplementations;
public SnapshotLockProvider() {
this.snapshotLockImplementations = StreamSupport.stream(ServiceLoader.load(SnapshotLock.class).spliterator(), false)
.collect(Collectors.toList());
}
public SnapshotLockProvider(List<SnapshotLock> snapshotLockImplementations) {
this.snapshotLockImplementations = snapshotLockImplementations;
}
@Override
public SnapshotLock createService(Configuration configuration, ServiceRegistry serviceRegistry) {
@ -37,23 +58,38 @@ public SnapshotLock createService(Configuration configuration, ServiceRegistry s
final String snapshotLockingModeCustomName = commonConnectorConfig.snapshotLockingModeCustomName();
String snapshotLockMode;
Predicate<SnapshotLock> byNameAndConnectorFilter;
Predicate<SnapshotLock> byNameFilter;
if ("custom".equals(configuredSnapshotLockingMode) && !snapshotLockingModeCustomName.isEmpty()) {
snapshotLockMode = snapshotLockingModeCustomName;
byNameAndConnectorFilter = byNameFilter = snapshotLockImplementation -> snapshotLockImplementation.name().equals(snapshotLockMode);
}
else {
snapshotLockMode = configuredSnapshotLockingMode;
byNameFilter = snapshotLockImplementation -> snapshotLockImplementation.name().equals(snapshotLockMode);
byNameAndConnectorFilter = byNameFilter.and(snapshotLockImplementation -> isForCurrentConnector(configuration, snapshotLockImplementation.getClass()));
}
Optional<SnapshotLock> snapshotLock = StreamSupport.stream(ServiceLoader.load(SnapshotLock.class).spliterator(), false)
.filter(s -> s.name().equalsIgnoreCase(snapshotLockMode))
Optional<? extends SnapshotLock> snapshotLock = snapshotLockImplementations.stream()
.filter(byNameAndConnectorFilter)
.findAny();
return snapshotLock.map(s -> {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
((BeanRegistryAware) s).injectBeanRegistry(beanRegistry);
if (snapshotLock.isEmpty()) { // Fallback on generic implementation
snapshotLock = snapshotLockImplementations.stream()
.filter(byNameFilter)
.findAny();
snapshotLock.ifPresent(lockImpl -> LOGGER.warn("Found a not connector specific implementation {} for lock mode {}",
lockImpl.getClass().getName(),
snapshotLockMode));
}
return snapshotLock.map(snapshotLockImpl -> {
snapshotLockImpl.configure(configuration.asMap());
if (snapshotLockImpl instanceof BeanRegistryAware) {
((BeanRegistryAware) snapshotLockImpl).injectBeanRegistry(beanRegistry);
}
return s;
return snapshotLockImpl;
})
.orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshot lock mode. Please check your configuration.", snapshotLockMode)));

View File

@ -7,17 +7,23 @@
import static io.debezium.config.CommonConnectorConfig.SnapshotQueryMode.CUSTOM;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.SnapshotQueryMode;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotQuery;
@ -27,7 +33,22 @@
*
* @author Mario Fiore Vitale
*/
public class SnapshotQueryProvider implements ServiceProvider<SnapshotQuery> {
public class SnapshotQueryProvider extends AbstractSnapshotProvider implements ServiceProvider<SnapshotQuery> {
private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotQueryProvider.class);
final List<SnapshotQuery> snapshotQueryImplementations;
public SnapshotQueryProvider() {
this.snapshotQueryImplementations = StreamSupport.stream(ServiceLoader.load(SnapshotQuery.class).spliterator(), false)
.collect(Collectors.toList());
}
public SnapshotQueryProvider(List<SnapshotQuery> snapshotQueryImplementations) {
this.snapshotQueryImplementations = snapshotQueryImplementations;
}
@Override
public SnapshotQuery createService(Configuration configuration, ServiceRegistry serviceRegistry) {
@ -35,21 +56,35 @@ public SnapshotQuery createService(Configuration configuration, ServiceRegistry
BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class);
CommonConnectorConfig commonConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final SnapshotQueryMode configuredSnapshotQueryMode = commonConnectorConfig.snapshotQueryMode();
final EnumeratedValue configuredSnapshotQueryMode = commonConnectorConfig.snapshotQueryMode();
final String snapshotQueryModeCustomName = commonConnectorConfig.snapshotQueryModeCustomName();
String snapshotQueryMode;
Predicate<SnapshotQuery> byNameAndConnectorFilter;
Predicate<SnapshotQuery> byNameFilter;
if (CUSTOM.equals(configuredSnapshotQueryMode) && !snapshotQueryModeCustomName.isEmpty()) {
snapshotQueryMode = snapshotQueryModeCustomName;
byNameFilter = byNameAndConnectorFilter = snapshotQueryImplementation -> snapshotQueryImplementation.name().equals(snapshotQueryMode);
}
else {
snapshotQueryMode = configuredSnapshotQueryMode.getValue();
byNameFilter = snapshotQueryImplementation -> snapshotQueryImplementation.name().equals(snapshotQueryMode);
byNameAndConnectorFilter = byNameFilter.and(snapshotQueryImplementation -> isForCurrentConnector(configuration, snapshotQueryImplementation.getClass()));
}
Optional<SnapshotQuery> snapshotQuery = StreamSupport.stream(ServiceLoader.load(SnapshotQuery.class).spliterator(), false)
.filter(s -> s.name().equals(snapshotQueryMode))
Optional<SnapshotQuery> snapshotQuery = snapshotQueryImplementations.stream()
.filter(byNameAndConnectorFilter)
.findAny();
if (snapshotQuery.isEmpty()) { // Fallback on generic implementation
snapshotQuery = snapshotQueryImplementations.stream()
.filter(byNameFilter)
.findAny();
snapshotQuery.ifPresent(QueryImpl -> LOGGER.warn("Found a not connector specific implementation {} for query mode {}",
QueryImpl.getClass().getName(),
snapshotQueryMode));
}
return snapshotQuery.map(s -> {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
@ -58,7 +93,6 @@ public SnapshotQuery createService(Configuration configuration, ServiceRegistry
return s;
})
.orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshot query mode. Please check your configuration.", snapshotQueryMode)));
}
@Override

View File

@ -0,0 +1,249 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.snapshot;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import io.debezium.DebeziumException;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.common.BaseSourceConnector;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotLock;
import io.debezium.spi.schema.DataCollectionId;
@RunWith(MockitoJUnitRunner.class)
public class SnapshotLockProviderTest {
@Test
public void whenBothImplementationHasConnectorSpecificAnnotationTheRightOneWillBeSelected() {
SnapshotLockProvider snapshotLockProvider = new SnapshotLockProvider(
List.of(new SnapshotLockAnnotatedCustomA(), new SnapshotLockAnnotatedCustomB()));
MockedObjects mockedObjects = getMockedObjects("myLock", MyConnectorA.class.getName());
SnapshotLock service = snapshotLockProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry);
Assertions.assertThat(service.getClass().getName()).isEqualTo(SnapshotLockAnnotatedCustomA.class.getName());
}
@Test
public void whenOneImplementationHasConnectorSpecificAnnotationThatMatchTheRunningConnectorItWillBeSelected() {
SnapshotLockProvider snapshotLockProvider = new SnapshotLockProvider(
List.of(new SnapshotLockCustomA(), new SnapshotLockAnnotatedCustomB()));
MockedObjects mockedObjects = getMockedObjects("myLock", MyConnectorB.class.getName());
SnapshotLock service = snapshotLockProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry);
Assertions.assertThat(service.getClass().getName()).isEqualTo(SnapshotLockAnnotatedCustomB.class.getName());
}
@Test
public void whenNoImplementationHasConnectorSpecificAnnotationThenTheFirstNotAnnotatedOneWillBeSelected() {
SnapshotLockProvider snapshotLockProvider = new SnapshotLockProvider(
List.of(new SnapshotLockCustomA(), new SnapshotLockCustomB()));
MockedObjects mockedObjects = getMockedObjects("myLock", MyConnectorB.class.getName());
SnapshotLock service = snapshotLockProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry);
Assertions.assertThat(service.getClass().getName()).isEqualTo(SnapshotLockCustomA.class.getName());
}
@Test
public void whenNoImplementationIsFoundThenAnExceptionIsThrown() {
SnapshotLockProvider snapshotLockProvider = new SnapshotLockProvider(
List.of(new SnapshotLockCustomA(), new SnapshotLockCustomB()));
MockedObjects mockedObjects = getMockedObjects("notExisting", MyConnectorB.class.getName());
assertThrows(DebeziumException.class, () -> snapshotLockProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry));
}
private MockedObjects getMockedObjects(String snapshotMode, String connectorClassName) {
CommonConnectorConfig commonConnectorConfig = mock(CommonConnectorConfig.class, Mockito.RETURNS_DEEP_STUBS);
EnumeratedValue enumeratedValue = mock(EnumeratedValue.class);
Mockito.<Optional<? extends EnumeratedValue>> when(commonConnectorConfig.getSnapshotLockingMode()).thenReturn(Optional.of(enumeratedValue));
when(commonConnectorConfig.getSnapshotLockingMode().get().getValue()).thenReturn(snapshotMode);
Configuration configuration = mock(Configuration.class);
when(configuration.getString("connector.class")).thenReturn(connectorClassName);
ServiceRegistry serviceRegistry = mock(ServiceRegistry.class);
BeanRegistry beanRegistry = mock(BeanRegistry.class);
when(beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class)).thenReturn(commonConnectorConfig);
when(serviceRegistry.tryGetService(BeanRegistry.class)).thenReturn(beanRegistry);
return new MockedObjects(configuration, serviceRegistry);
}
private static class MockedObjects {
public final Configuration configuration;
public final ServiceRegistry serviceRegistry;
MockedObjects(Configuration configuration, ServiceRegistry serviceRegistry) {
this.configuration = configuration;
this.serviceRegistry = serviceRegistry;
}
}
private class MyConnectorA extends BaseSourceConnector {
@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return null;
}
@Override
public <T extends DataCollectionId> List<T> getMatchingCollections(Configuration config) {
return null;
}
@Override
public void start(Map<String, String> map) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int i) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public String version() {
return null;
}
}
private class MyConnectorB extends BaseSourceConnector {
@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return null;
}
@Override
public <T extends DataCollectionId> List<T> getMatchingCollections(Configuration config) {
return null;
}
@Override
public void start(Map<String, String> map) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int i) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public String version() {
return null;
}
}
private class SnapshotLockCustomA extends AbstractSnapshotLock {
@Override
public String name() {
return "myLock";
}
}
@ConnectorSpecific(connector = MyConnectorA.class)
private class SnapshotLockAnnotatedCustomA extends AbstractSnapshotLock {
@Override
public String name() {
return "myLock";
}
}
private class SnapshotLockCustomB extends AbstractSnapshotLock {
@Override
public String name() {
return "myLock";
}
}
@ConnectorSpecific(connector = MyConnectorB.class)
private class SnapshotLockAnnotatedCustomB extends AbstractSnapshotLock {
@Override
public String name() {
return "myLock";
}
}
private abstract class AbstractSnapshotLock implements SnapshotLock {
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> tableLockingStatement(Duration lockTimeout, String tableId) {
return Optional.empty();
}
}
}

View File

@ -0,0 +1,247 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.snapshot;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import io.debezium.DebeziumException;
import io.debezium.annotation.ConnectorSpecific;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.common.BaseSourceConnector;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotQuery;
import io.debezium.spi.schema.DataCollectionId;
@RunWith(MockitoJUnitRunner.class)
public class SnapshotQueryProviderTest {
@Test
public void whenBothImplementationHasConnectorSpecificAnnotationTheRightOneWillBeSelected() {
SnapshotQueryProvider snapshotQueryProvider = new SnapshotQueryProvider(
List.of(new SnapshotQueryAnnotatedCustomA(), new SnapshotQueryAnnotatedCustomB()));
MockedObjects mockedObjects = getMockedObjects("myQuery", MyConnectorA.class.getName());
SnapshotQuery service = snapshotQueryProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry);
Assertions.assertThat(service.getClass().getName()).isEqualTo(SnapshotQueryAnnotatedCustomA.class.getName());
}
@Test
public void whenOneImplementationHasConnectorSpecificAnnotationThatMatchTheRunningConnectorItWillBeSelected() {
SnapshotQueryProvider snapshotQueryProvider = new SnapshotQueryProvider(
List.of(new SnapshotQueryCustomA(), new SnapshotQueryAnnotatedCustomB()));
MockedObjects mockedObjects = getMockedObjects("myQuery", MyConnectorB.class.getName());
SnapshotQuery service = snapshotQueryProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry);
Assertions.assertThat(service.getClass().getName()).isEqualTo(SnapshotQueryAnnotatedCustomB.class.getName());
}
@Test
public void whenNoImplementationHasConnectorSpecificAnnotationThenTheFirstNotAnnotatedOneWillBeSelected() {
SnapshotQueryProvider snapshotQueryProvider = new SnapshotQueryProvider(
List.of(new SnapshotQueryCustomA(), new SnapshotQueryCustomB()));
MockedObjects mockedObjects = getMockedObjects("myQuery", MyConnectorB.class.getName());
SnapshotQuery service = snapshotQueryProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry);
Assertions.assertThat(service.getClass().getName()).isEqualTo(SnapshotQueryCustomA.class.getName());
}
@Test
public void whenNoImplementationIsFoundThenAnExceptionIsThrown() {
SnapshotQueryProvider snapshotQueryProvider = new SnapshotQueryProvider(
List.of(new SnapshotQueryCustomA(), new SnapshotQueryCustomB()));
MockedObjects mockedObjects = getMockedObjects("notExisting", MyConnectorB.class.getName());
assertThrows(DebeziumException.class, () -> snapshotQueryProvider.createService(mockedObjects.configuration, mockedObjects.serviceRegistry));
}
private MockedObjects getMockedObjects(String snapshotMode, String connectorClassName) {
CommonConnectorConfig commonConnectorConfig = mock(CommonConnectorConfig.class, Mockito.RETURNS_DEEP_STUBS);
EnumeratedValue enumeratedValue = mock(EnumeratedValue.class);
when(commonConnectorConfig.snapshotQueryMode()).thenReturn(enumeratedValue);
when(commonConnectorConfig.snapshotQueryMode().getValue()).thenReturn(snapshotMode);
Configuration configuration = mock(Configuration.class);
when(configuration.getString("connector.class")).thenReturn(connectorClassName);
ServiceRegistry serviceRegistry = mock(ServiceRegistry.class);
BeanRegistry beanRegistry = mock(BeanRegistry.class);
when(beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class)).thenReturn(commonConnectorConfig);
when(serviceRegistry.tryGetService(BeanRegistry.class)).thenReturn(beanRegistry);
return new MockedObjects(configuration, serviceRegistry);
}
private static class MockedObjects {
public final Configuration configuration;
public final ServiceRegistry serviceRegistry;
MockedObjects(Configuration configuration, ServiceRegistry serviceRegistry) {
this.configuration = configuration;
this.serviceRegistry = serviceRegistry;
}
}
private class MyConnectorA extends BaseSourceConnector {
@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return null;
}
@Override
public <T extends DataCollectionId> List<T> getMatchingCollections(Configuration config) {
return null;
}
@Override
public void start(Map<String, String> map) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int i) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public String version() {
return null;
}
}
private class MyConnectorB extends BaseSourceConnector {
@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return null;
}
@Override
public <T extends DataCollectionId> List<T> getMatchingCollections(Configuration config) {
return null;
}
@Override
public void start(Map<String, String> map) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int i) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public String version() {
return null;
}
}
private class SnapshotQueryCustomA extends AbstractSnapshotQuery {
@Override
public String name() {
return "myQuery";
}
}
@ConnectorSpecific(connector = MyConnectorA.class)
private class SnapshotQueryAnnotatedCustomA extends AbstractSnapshotQuery {
@Override
public String name() {
return "myQuery";
}
}
private class SnapshotQueryCustomB extends AbstractSnapshotQuery {
@Override
public String name() {
return "myQuery";
}
}
@ConnectorSpecific(connector = MyConnectorB.class)
private class SnapshotQueryAnnotatedCustomB extends AbstractSnapshotQuery {
@Override
public String name() {
return "myQuery";
}
}
private abstract class AbstractSnapshotQuery implements SnapshotQuery {
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {
return Optional.empty();
}
}
}