DBZ-8135 Implementation of in-process signal channel

This commit is contained in:
Jakub Cechacek 2024-08-12 16:07:17 +02:00 committed by Jiri Pechanec
parent 40114d1219
commit c7b8940691
3 changed files with 92 additions and 1 deletions

View File

@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.signal.channels.process;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
/**
* Implementation of {@link SignalChannelReader} that also implements {@link SignalChannelWriter}
* used for sending signal from the same JVM process
*
* <p>
* Mainly targeted at Debezium Engine
* </p>
*/
@ThreadSafe
public class InProcessSignalChannel implements SignalChannelReader, SignalChannelWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(InProcessSignalChannel.class);
public static final String CHANNEL_NAME = "in-process";
private final AtomicBoolean open = new AtomicBoolean(false);
private final Queue<SignalRecord> signals = new ConcurrentLinkedQueue<>();
@Override
public String name() {
return CHANNEL_NAME;
}
@Override
public void init(CommonConnectorConfig connectorConfig) {
open.compareAndSet(false, true);
LOGGER.info("Reading signals from {} channel", CHANNEL_NAME);
}
@Override
public List<SignalRecord> read() {
return Stream.ofNullable(signals.poll()).toList();
}
@Override
public void close() {
if (open.compareAndSet(true, false)) {
drain();
}
}
private void drain() {
while (!signals.isEmpty()) {
signals.clear();
}
}
@Override
public void signal(SignalRecord signal) {
if (!open.get()) {
throw new DebeziumException("Channel already closed");
}
signals.add(signal);
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.signal.channels.process;
import io.debezium.pipeline.signal.SignalRecord;
/**
* Interface for writing signals to a channel.
*/
public interface SignalChannelWriter {
void signal(SignalRecord signal);
}

View File

@ -1,4 +1,5 @@
io.debezium.pipeline.signal.channels.SourceSignalChannel
io.debezium.pipeline.signal.channels.KafkaSignalChannel
io.debezium.pipeline.signal.channels.FileSignalChannel
io.debezium.pipeline.signal.channels.jmx.JmxSignalChannel
io.debezium.pipeline.signal.channels.jmx.JmxSignalChannel
io.debezium.pipeline.signal.channels.process.InProcessSignalChannel