diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java index 0a8664c61..8f8c02534 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java @@ -14,6 +14,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -67,6 +68,7 @@ public abstract class BaseSourceTask

signalChannels; protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPositionValidator logPositionValidator, Offsets previousOffsets, DatabaseSchema schema, @@ -259,8 +261,33 @@ public final void start(Map props) { } } + /** + * Returns the available signal channels. + *

+ * The signal channels are loaded using the {@link ServiceLoader} mechanism and cached for the lifetime of the task. + *

+ * + * @return list of loaded signal channels + */ public List getAvailableSignalChannels() { - return availableSignalChannels.stream().map(ServiceLoader.Provider::get).collect(Collectors.toList()); + if (signalChannels == null) { + signalChannels = availableSignalChannels.stream().map(ServiceLoader.Provider::get).collect(Collectors.toList()); + } + return signalChannels; + } + + /** + * Returns the first available signal channel of the given type. + * + * @param type the type of the signal channel + * @param the type of the signal channel + * @return the first available signal channel of the given type or empty optional if not available + */ + public Optional getSignalChannel(Class type) { + return getAvailableSignalChannels().stream() + .filter(type::isInstance) + .map(type::cast) + .findFirst(); } protected Configuration withMaskedSensitiveOptions(Configuration config) { @@ -542,5 +569,4 @@ protected void registerServiceProviders(ServiceRegistry serviceRegistry) { serviceRegistry.registerServiceProvider(new SnapshotQueryProvider()); serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider()); } - }