DBZ-8135 Caching task channels in AsyncEngineSignaler

This commit is contained in:
Jakub Cechacek 2024-08-20 11:13:13 +02:00 committed by Jiri Pechanec
parent 632eccaf16
commit 8277961085

View File

@ -5,6 +5,7 @@
*/
package io.debezium.embedded.async;
import java.util.List;
import java.util.Optional;
import io.debezium.DebeziumException;
@ -15,6 +16,7 @@
public class AsyncEngineSignaler implements DebeziumEngine.Signaler<SignalRecord> {
private AsyncEmbeddedEngine<?> engine;
private List<InProcessSignalChannel> channels;
@Override
public <E extends DebeziumEngine<?>> void init(E engine) {
@ -26,11 +28,14 @@ public <E extends DebeziumEngine<?>> void init(E engine) {
@Override
public void signal(SignalRecord signal) {
engine.tasks().stream()
.map(EngineSourceTask::debeziumConnectTask)
.flatMap(Optional::stream)
.map(task -> task.getSignalChannel(InProcessSignalChannel.class))
.flatMap(Optional::stream)
.forEach(channel -> channel.signal(signal));
if (channels == null) {
channels = engine.tasks().stream()
.map(EngineSourceTask::debeziumConnectTask)
.flatMap(Optional::stream)
.map(task -> task.getSignalChannel(InProcessSignalChannel.class))
.flatMap(Optional::stream)
.toList();
}
channels.forEach(channel -> channel.signal(signal));
}
}