DBZ-8160 AsyncEngineSignaler is now initialised with list of channel writers instead of entire engine

This commit is contained in:
Jakub Cechacek 2024-08-21 15:22:36 +02:00 committed by Jiri Pechanec
parent 8036cd3219
commit f20d05fab9
2 changed files with 16 additions and 19 deletions

View File

@ -77,6 +77,8 @@
import io.debezium.engine.source.EngineSourceTask;
import io.debezium.engine.source.EngineSourceTaskContext;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
import io.debezium.util.DelayStrategy;
/**
@ -830,7 +832,15 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin
@Override
public Signaler getSignaler() {
if (signaler == null) {
signaler = new AsyncEngineSignaler(this);
var channels = tasks().stream()
.map(EngineSourceTask::debeziumConnectTask)
.flatMap(Optional::stream)
.map(task -> task.getSignalChannel(InProcessSignalChannel.class))
.flatMap(Optional::stream)
.map(SignalChannelWriter.class::cast)
.toList();
signaler = new AsyncEngineSignaler(channels);
}
return signaler;
}

View File

@ -6,33 +6,20 @@
package io.debezium.embedded.async;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import io.debezium.DebeziumException;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.source.EngineSourceTask;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
public class AsyncEngineSignaler implements DebeziumEngine.Signaler {
private AsyncEmbeddedEngine<?> engine;
private List<InProcessSignalChannel> channels;
private final List<SignalChannelWriter> channels;
public AsyncEngineSignaler(AsyncEmbeddedEngine<?> engine) {
this.engine = Objects.requireNonNull(engine);
public AsyncEngineSignaler(List<SignalChannelWriter> channels) {
this.channels = channels;
}
@Override
public void signal(DebeziumEngine.Signal signal) {
if (channels == null) {
channels = engine.tasks().stream()
.map(EngineSourceTask::debeziumConnectTask)
.flatMap(Optional::stream)
.map(task -> task.getSignalChannel(InProcessSignalChannel.class))
.flatMap(Optional::stream)
.toList();
}
var sr = new SignalRecord(signal);
channels.forEach(channel -> channel.signal(sr));
}