diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java index 8f8c02534..c40958dbf 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java @@ -40,6 +40,7 @@ import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.pipeline.notification.channels.NotificationChannel; import io.debezium.pipeline.signal.channels.SignalChannelReader; +import io.debezium.pipeline.signal.channels.process.SignalChannelWriter; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Offsets; import io.debezium.pipeline.spi.Partition; @@ -277,16 +278,14 @@ public List getAvailableSignalChannels() { } /** - * Returns the first available signal channel of the given type. + * Returns the first available signal channel writer * - * @param type the type of the signal channel - * @param the type of the signal channel - * @return the first available signal channel of the given type or empty optional if not available + * @return the first available signal channel writer empty optional if not available */ - public Optional getSignalChannel(Class type) { + public Optional getAvailableSignalChannelWriter() { return getAvailableSignalChannels().stream() - .filter(type::isInstance) - .map(type::cast) + .filter(SignalChannelWriter.class::isInstance) + .map(SignalChannelWriter.class::cast) .findFirst(); } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineSignaler.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineSignaler.java index 0a45f34e4..ba7ab7cc7 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineSignaler.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineSignaler.java @@ -12,9 +12,12 @@ import io.debezium.pipeline.signal.channels.process.SignalChannelWriter; public class EmbeddedEngineSignaler implements DebeziumEngine.Signaler { - private final List channels; + private final List channels; - public EmbeddedEngineSignaler(List channels) { + public EmbeddedEngineSignaler(List channels) { + if (channels == null || channels.isEmpty()) { + throw new IllegalArgumentException("At least one channel must be provided"); + } this.channels = channels; } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java index db70ae403..9881a47a2 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java @@ -78,8 +78,6 @@ import io.debezium.engine.source.EngineSourceTask; import io.debezium.engine.source.EngineSourceTaskContext; import io.debezium.engine.spi.OffsetCommitPolicy; -import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel; -import io.debezium.pipeline.signal.channels.process.SignalChannelWriter; import io.debezium.util.DelayStrategy; /** @@ -834,11 +832,8 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin public Signaler getSignaler() { if (signaler == null) { var channels = tasks().stream() - .map(EngineSourceTask::debeziumConnectTask) + .map(EngineSourceTask::signalChannelWriter) .flatMap(Optional::stream) - .map(task -> task.getSignalChannel(InProcessSignalChannel.class)) - .flatMap(Optional::stream) - .map(SignalChannelWriter.class::cast) .toList(); signaler = new EmbeddedEngineSignaler(channels); diff --git a/debezium-embedded/src/main/java/io/debezium/engine/source/EngineSourceTask.java b/debezium-embedded/src/main/java/io/debezium/engine/source/EngineSourceTask.java index bef8c078e..16e8e48d9 100644 --- a/debezium-embedded/src/main/java/io/debezium/engine/source/EngineSourceTask.java +++ b/debezium-embedded/src/main/java/io/debezium/engine/source/EngineSourceTask.java @@ -10,6 +10,7 @@ import org.apache.kafka.connect.source.SourceTask; import io.debezium.connector.common.BaseSourceTask; +import io.debezium.pipeline.signal.channels.process.SignalChannelWriter; /** * Implementation of {@link DebeziumSourceTask} which currently serves only as a wrapper @@ -37,9 +38,10 @@ public SourceTask connectTask() { } @SuppressWarnings("unchecked") - public Optional> debeziumConnectTask() { + public Optional signalChannelWriter() { return Optional.of(connectTask) .filter(BaseSourceTask.class::isInstance) - .map(BaseSourceTask.class::cast); + .map(BaseSourceTask.class::cast) + .flatMap(BaseSourceTask::getAvailableSignalChannelWriter); } }