DBZ-8160 Signaler creation is now responsibility of engine implementations

This commit is contained in:
Jakub Cechacek 2024-08-21 15:09:24 +02:00 committed by Jiri Pechanec
parent 4406c78a2b
commit 8036cd3219
4 changed files with 46 additions and 47 deletions

View File

@ -47,6 +47,15 @@ public interface DebeziumEngine<R> extends Runnable, Closeable {
String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms"; String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms";
/**
* @return this engine's signaler, if it supports signaling
* @throws UnsupportedOperationException if signaling is not supported by this engine
*/
@Incubating
default Signaler getSignaler() {
throw new UnsupportedOperationException("Signaling is not supported by this engine");
}
/** /**
* A callback function to be notified when the connector completes. * A callback function to be notified when the connector completes.
*/ */
@ -178,30 +187,28 @@ default boolean supportsTombstoneEvents() {
} }
/** /**
* Signaler defines the contract for sending signals to connector tasks. * A record representing signal sent to the engine via {@link DebeziumEngine.Signaler}.
* @param <T> signal type * @param id the unique identifier of the signal sent, usually UUID, can be used for deduplication
* @param type the unique logical name of the code executing the signal
* @param data the data in JSON format that are passed to the signal code
* @param additionalData additional data which might be required by specific signal types
*/ */
@Incubating @Incubating
interface Signaler<T> { record Signal(String id, String type, String data, Map<String, Object> additionalData) {
}
/** /**
* Initialize the signaler with the engine. * Signaler defines the contract for sending signals to connector tasks.
* */
* <p> @Incubating
* Note that it is up the the implementation to decide whether it is compatible with given engine. interface Signaler {
* </p>
*
* @param engine the engine instance
* @param <E> type of engine
*/
<E extends DebeziumEngine<?>> void init(E engine);
/** /**
* Send a signal to the connector. * Send a signal to the connector.
* *
* @param signal the signal to send * @param signal the signal to send
*/ */
void signal(T signal); void signal(Signal signal);
} }
/** /**
@ -227,15 +234,6 @@ interface Builder<R> {
*/ */
Builder<R> notifying(ChangeConsumer<R> handler); Builder<R> notifying(ChangeConsumer<R> handler);
/**
* Use the specified signaler to send signals to the connector.
* @param signaler the signaler
* @return this builder object so methods can be chained together; never null
*/
default Builder<R> using(Signaler<?> signaler) {
return this;
};
/** /**
* Use the specified configuration for the connector. The configuration is assumed to already be valid. * Use the specified configuration for the connector. The configuration is assumed to already be valid.
* *

View File

@ -13,6 +13,7 @@
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.engine.DebeziumEngine;
import io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow; import io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow;
/** /**
@ -38,6 +39,10 @@ public SignalRecord(String id, String type, String data, Map<String, Object> add
this.additionalData = additionalData; this.additionalData = additionalData;
} }
public SignalRecord(DebeziumEngine.Signal signal) {
this(signal.id(), signal.type(), signal.data(), signal.additionalData());
}
public static Optional<SignalRecord> buildSignalRecordFromChangeEventSource(Struct value, CommonConnectorConfig config) { public static Optional<SignalRecord> buildSignalRecordFromChangeEventSource(Struct value, CommonConnectorConfig config) {
if (Envelope.Operation.DELETE.code().equals(value.get(Envelope.FieldName.OPERATION))) { if (Envelope.Operation.DELETE.code().equals(value.get(Envelope.FieldName.OPERATION))) {

View File

@ -114,6 +114,7 @@ public final class AsyncEmbeddedEngine<R> implements DebeziumEngine<R>, AsyncEng
private final ExecutorService recordService; private final ExecutorService recordService;
// A latch to make sure close() method finishes before we call completion callback, see also DBZ-7496. // A latch to make sure close() method finishes before we call completion callback, see also DBZ-7496.
private final CountDownLatch shutDownLatch = new CountDownLatch(1); private final CountDownLatch shutDownLatch = new CountDownLatch(1);
private Signaler signaler;
private AsyncEmbeddedEngine(Properties config, private AsyncEmbeddedEngine(Properties config,
Consumer<R> consumer, Consumer<R> consumer,
@ -124,8 +125,7 @@ private AsyncEmbeddedEngine(Properties config,
ConnectorCallback connectorCallback, ConnectorCallback connectorCallback,
OffsetCommitPolicy offsetCommitPolicy, OffsetCommitPolicy offsetCommitPolicy,
HeaderConverter headerConverter, HeaderConverter headerConverter,
Function<SourceRecord, R> recordConverter, Function<SourceRecord, R> recordConverter) {
Signaler<?> signaler) {
this.config = Configuration.from(Objects.requireNonNull(config, "A connector configuration must be specified.")); this.config = Configuration.from(Objects.requireNonNull(config, "A connector configuration must be specified."));
this.consumer = consumer; this.consumer = consumer;
@ -137,9 +137,6 @@ private AsyncEmbeddedEngine(Properties config,
this.headerConverter = headerConverter; this.headerConverter = headerConverter;
this.recordConverter = recordConverter; this.recordConverter = recordConverter;
this.sourceConverter = (record) -> ((EmbeddedEngineChangeEvent<?, ?, ?>) record).sourceRecord(); this.sourceConverter = (record) -> ((EmbeddedEngineChangeEvent<?, ?, ?>) record).sourceRecord();
if (signaler != null) {
signaler.init(this);
}
// Ensure either user ChangeConsumer or Consumer is provided and validate supported records ordering is provided when relevant. // Ensure either user ChangeConsumer or Consumer is provided and validate supported records ordering is provided when relevant.
if (this.handler == null & this.consumer == null) { if (this.handler == null & this.consumer == null) {
@ -830,6 +827,14 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin
return true; return true;
} }
@Override
public Signaler getSignaler() {
if (signaler == null) {
signaler = new AsyncEngineSignaler(this);
}
return signaler;
}
/** /**
* Implementation of {@link DebeziumEngine.Builder} which creates {@link AsyncEmbeddedEngine}. * Implementation of {@link DebeziumEngine.Builder} which creates {@link AsyncEmbeddedEngine}.
*/ */
@ -846,7 +851,6 @@ public static final class AsyncEngineBuilder<R> implements DebeziumEngine.Builde
private HeaderConverter headerConverter; private HeaderConverter headerConverter;
private Function<SourceRecord, R> recordConverter; private Function<SourceRecord, R> recordConverter;
private ConverterBuilder converterBuilder; private ConverterBuilder converterBuilder;
private Signaler<?> signaler;
AsyncEngineBuilder() { AsyncEngineBuilder() {
this((KeyValueHeaderChangeEventFormat<?, ?, ?>) null); this((KeyValueHeaderChangeEventFormat<?, ?, ?>) null);
@ -893,12 +897,6 @@ public Builder<R> notifying(final ChangeConsumer<R> handler) {
return this; return this;
} }
@Override
public Builder<R> using(Signaler<?> signaler) {
this.signaler = signaler;
return this;
}
@Override @Override
public Builder<R> using(final Properties config) { public Builder<R> using(final Properties config) {
this.config = config; this.config = config;
@ -945,7 +943,7 @@ public DebeziumEngine<R> build() {
recordConverter = converterBuilder.toFormat(headerConverter); recordConverter = converterBuilder.toFormat(headerConverter);
} }
return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, headerConverter, return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, headerConverter,
recordConverter, signaler); recordConverter);
} }
} }

View File

@ -6,6 +6,7 @@
package io.debezium.embedded.async; package io.debezium.embedded.async;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
@ -14,20 +15,16 @@
import io.debezium.pipeline.signal.SignalRecord; import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel; import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel;
public class AsyncEngineSignaler implements DebeziumEngine.Signaler<SignalRecord> { public class AsyncEngineSignaler implements DebeziumEngine.Signaler {
private AsyncEmbeddedEngine<?> engine; private AsyncEmbeddedEngine<?> engine;
private List<InProcessSignalChannel> channels; private List<InProcessSignalChannel> channels;
@Override public AsyncEngineSignaler(AsyncEmbeddedEngine<?> engine) {
public <E extends DebeziumEngine<?>> void init(E engine) { this.engine = Objects.requireNonNull(engine);
if (!(engine instanceof AsyncEmbeddedEngine)) {
throw new DebeziumException("AsyncEngineSignaler can only be used with AsyncEmbeddedEngine");
}
this.engine = (AsyncEmbeddedEngine<?>) engine;
} }
@Override @Override
public void signal(SignalRecord signal) { public void signal(DebeziumEngine.Signal signal) {
if (channels == null) { if (channels == null) {
channels = engine.tasks().stream() channels = engine.tasks().stream()
.map(EngineSourceTask::debeziumConnectTask) .map(EngineSourceTask::debeziumConnectTask)
@ -36,6 +33,7 @@ public void signal(SignalRecord signal) {
.flatMap(Optional::stream) .flatMap(Optional::stream)
.toList(); .toList();
} }
channels.forEach(channel -> channel.signal(signal)); var sr = new SignalRecord(signal);
channels.forEach(channel -> channel.signal(sr));
} }
} }