DBZ-8135 Exposing signal channels from BaseSourceTask

This commit is contained in:
Jakub Cechacek 2024-08-12 16:08:00 +02:00 committed by Jiri Pechanec
parent c7b8940691
commit 7e9a387045

View File

@ -14,6 +14,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -67,6 +68,7 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5));
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
private Configuration config;
private List<SignalChannelReader> signalChannels;
protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPositionValidator logPositionValidator, Offsets<P, O> previousOffsets,
DatabaseSchema schema,
@ -259,8 +261,33 @@ public final void start(Map<String, String> props) {
}
}
/**
* Returns the available signal channels.
* <p>
* The signal channels are loaded using the {@link ServiceLoader} mechanism and cached for the lifetime of the task.
* </p>
*
* @return list of loaded signal channels
*/
public List<SignalChannelReader> getAvailableSignalChannels() {
return availableSignalChannels.stream().map(ServiceLoader.Provider::get).collect(Collectors.toList());
if (signalChannels == null) {
signalChannels = availableSignalChannels.stream().map(ServiceLoader.Provider::get).collect(Collectors.toList());
}
return signalChannels;
}
/**
* Returns the first available signal channel of the given type.
*
* @param type the type of the signal channel
* @param <T> the type of the signal channel
* @return the first available signal channel of the given type or empty optional if not available
*/
public <T extends SignalChannelReader> Optional<T> getSignalChannel(Class<T> type) {
return getAvailableSignalChannels().stream()
.filter(type::isInstance)
.map(type::cast)
.findFirst();
}
protected Configuration withMaskedSensitiveOptions(Configuration config) {
@ -542,5 +569,4 @@ protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
serviceRegistry.registerServiceProvider(new SnapshotQueryProvider());
serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider());
}
}