DBZ-8160 Simplified access to signal channel writer instance

This commit is contained in:
Jakub Cechacek 2024-08-28 15:09:25 +02:00 committed by Jiri Pechanec
parent 0791ea7243
commit e1ae60e756
4 changed files with 16 additions and 17 deletions

View File

@ -40,6 +40,7 @@
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
@ -277,16 +278,14 @@ public List<SignalChannelReader> getAvailableSignalChannels() {
}
/**
* Returns the first available signal channel of the given type.
* Returns the first available signal channel writer
*
* @param type the type of the signal channel
* @param <T> the type of the signal channel
* @return the first available signal channel of the given type or empty optional if not available
* @return the first available signal channel writer empty optional if not available
*/
public <T extends SignalChannelReader> Optional<T> getSignalChannel(Class<T> type) {
public Optional<? extends SignalChannelWriter> getAvailableSignalChannelWriter() {
return getAvailableSignalChannels().stream()
.filter(type::isInstance)
.map(type::cast)
.filter(SignalChannelWriter.class::isInstance)
.map(SignalChannelWriter.class::cast)
.findFirst();
}

View File

@ -12,9 +12,12 @@
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
public class EmbeddedEngineSignaler implements DebeziumEngine.Signaler {
private final List<SignalChannelWriter> channels;
private final List<? extends SignalChannelWriter> channels;
public EmbeddedEngineSignaler(List<SignalChannelWriter> channels) {
public EmbeddedEngineSignaler(List<? extends SignalChannelWriter> channels) {
if (channels == null || channels.isEmpty()) {
throw new IllegalArgumentException("At least one channel must be provided");
}
this.channels = channels;
}

View File

@ -78,8 +78,6 @@
import io.debezium.engine.source.EngineSourceTask;
import io.debezium.engine.source.EngineSourceTaskContext;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
import io.debezium.util.DelayStrategy;
/**
@ -834,11 +832,8 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin
public Signaler getSignaler() {
if (signaler == null) {
var channels = tasks().stream()
.map(EngineSourceTask::debeziumConnectTask)
.map(EngineSourceTask::signalChannelWriter)
.flatMap(Optional::stream)
.map(task -> task.getSignalChannel(InProcessSignalChannel.class))
.flatMap(Optional::stream)
.map(SignalChannelWriter.class::cast)
.toList();
signaler = new EmbeddedEngineSignaler(channels);

View File

@ -10,6 +10,7 @@
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
/**
* Implementation of {@link DebeziumSourceTask} which currently serves only as a wrapper
@ -37,9 +38,10 @@ public SourceTask connectTask() {
}
@SuppressWarnings("unchecked")
public Optional<BaseSourceTask<?, ?>> debeziumConnectTask() {
public Optional<? extends SignalChannelWriter> signalChannelWriter() {
return Optional.of(connectTask)
.filter(BaseSourceTask.class::isInstance)
.map(BaseSourceTask.class::cast);
.map(BaseSourceTask.class::cast)
.flatMap(BaseSourceTask::getAvailableSignalChannelWriter);
}
}