From 8036cd32195ffc6299042ef910a915d881e2e6ee Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Wed, 21 Aug 2024 15:09:24 +0200 Subject: [PATCH] DBZ-8160 Signaler creation is now responsibility of engine implementations --- .../io/debezium/engine/DebeziumEngine.java | 46 +++++++++---------- .../pipeline/signal/SignalRecord.java | 5 ++ .../embedded/async/AsyncEmbeddedEngine.java | 24 +++++----- .../embedded/async/AsyncEngineSignaler.java | 18 ++++---- 4 files changed, 46 insertions(+), 47 deletions(-) diff --git a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java index 643c45b71..63e4c888c 100644 --- a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java +++ b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java @@ -47,6 +47,15 @@ public interface DebeziumEngine extends Runnable, Closeable { String OFFSET_FLUSH_INTERVAL_MS_PROP = "offset.flush.interval.ms"; + /** + * @return this engine's signaler, if it supports signaling + * @throws UnsupportedOperationException if signaling is not supported by this engine + */ + @Incubating + default Signaler getSignaler() { + throw new UnsupportedOperationException("Signaling is not supported by this engine"); + } + /** * A callback function to be notified when the connector completes. */ @@ -178,30 +187,28 @@ default boolean supportsTombstoneEvents() { } /** - * Signaler defines the contract for sending signals to connector tasks. - * @param signal type + * A record representing signal sent to the engine via {@link DebeziumEngine.Signaler}. + * @param id the unique identifier of the signal sent, usually UUID, can be used for deduplication + * @param type the unique logical name of the code executing the signal + * @param data the data in JSON format that are passed to the signal code + * @param additionalData additional data which might be required by specific signal types */ @Incubating - interface Signaler { + record Signal(String id, String type, String data, Map additionalData) { + } - /** - * Initialize the signaler with the engine. - * - *

- * Note that it is up the the implementation to decide whether it is compatible with given engine. - *

- * - * @param engine the engine instance - * @param type of engine - */ - > void init(E engine); + /** + * Signaler defines the contract for sending signals to connector tasks. + */ + @Incubating + interface Signaler { /** * Send a signal to the connector. * * @param signal the signal to send */ - void signal(T signal); + void signal(Signal signal); } /** @@ -227,15 +234,6 @@ interface Builder { */ Builder notifying(ChangeConsumer handler); - /** - * Use the specified signaler to send signals to the connector. - * @param signaler the signaler - * @return this builder object so methods can be chained together; never null - */ - default Builder using(Signaler signaler) { - return this; - }; - /** * Use the specified configuration for the connector. The configuration is assumed to already be valid. * diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalRecord.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalRecord.java index 1db1e979b..39194bb4a 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalRecord.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalRecord.java @@ -13,6 +13,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.data.Envelope; +import io.debezium.engine.DebeziumEngine; import io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow; /** @@ -38,6 +39,10 @@ public SignalRecord(String id, String type, String data, Map add this.additionalData = additionalData; } + public SignalRecord(DebeziumEngine.Signal signal) { + this(signal.id(), signal.type(), signal.data(), signal.additionalData()); + } + public static Optional buildSignalRecordFromChangeEventSource(Struct value, CommonConnectorConfig config) { if (Envelope.Operation.DELETE.code().equals(value.get(Envelope.FieldName.OPERATION))) { diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java index c9116241c..7286210ba 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java @@ -114,6 +114,7 @@ public final class AsyncEmbeddedEngine implements DebeziumEngine, AsyncEng private final ExecutorService recordService; // A latch to make sure close() method finishes before we call completion callback, see also DBZ-7496. private final CountDownLatch shutDownLatch = new CountDownLatch(1); + private Signaler signaler; private AsyncEmbeddedEngine(Properties config, Consumer consumer, @@ -124,8 +125,7 @@ private AsyncEmbeddedEngine(Properties config, ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy, HeaderConverter headerConverter, - Function recordConverter, - Signaler signaler) { + Function recordConverter) { this.config = Configuration.from(Objects.requireNonNull(config, "A connector configuration must be specified.")); this.consumer = consumer; @@ -137,9 +137,6 @@ private AsyncEmbeddedEngine(Properties config, this.headerConverter = headerConverter; this.recordConverter = recordConverter; this.sourceConverter = (record) -> ((EmbeddedEngineChangeEvent) record).sourceRecord(); - if (signaler != null) { - signaler.init(this); - } // Ensure either user ChangeConsumer or Consumer is provided and validate supported records ordering is provided when relevant. if (this.handler == null & this.consumer == null) { @@ -830,6 +827,14 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin return true; } + @Override + public Signaler getSignaler() { + if (signaler == null) { + signaler = new AsyncEngineSignaler(this); + } + return signaler; + } + /** * Implementation of {@link DebeziumEngine.Builder} which creates {@link AsyncEmbeddedEngine}. */ @@ -846,7 +851,6 @@ public static final class AsyncEngineBuilder implements DebeziumEngine.Builde private HeaderConverter headerConverter; private Function recordConverter; private ConverterBuilder converterBuilder; - private Signaler signaler; AsyncEngineBuilder() { this((KeyValueHeaderChangeEventFormat) null); @@ -893,12 +897,6 @@ public Builder notifying(final ChangeConsumer handler) { return this; } - @Override - public Builder using(Signaler signaler) { - this.signaler = signaler; - return this; - } - @Override public Builder using(final Properties config) { this.config = config; @@ -945,7 +943,7 @@ public DebeziumEngine build() { recordConverter = converterBuilder.toFormat(headerConverter); } return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, headerConverter, - recordConverter, signaler); + recordConverter); } } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineSignaler.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineSignaler.java index 95aa2291d..68a7f956a 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineSignaler.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineSignaler.java @@ -6,6 +6,7 @@ package io.debezium.embedded.async; import java.util.List; +import java.util.Objects; import java.util.Optional; import io.debezium.DebeziumException; @@ -14,20 +15,16 @@ import io.debezium.pipeline.signal.SignalRecord; import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel; -public class AsyncEngineSignaler implements DebeziumEngine.Signaler { +public class AsyncEngineSignaler implements DebeziumEngine.Signaler { private AsyncEmbeddedEngine engine; private List channels; - @Override - public > void init(E engine) { - if (!(engine instanceof AsyncEmbeddedEngine)) { - throw new DebeziumException("AsyncEngineSignaler can only be used with AsyncEmbeddedEngine"); - } - this.engine = (AsyncEmbeddedEngine) engine; + public AsyncEngineSignaler(AsyncEmbeddedEngine engine) { + this.engine = Objects.requireNonNull(engine); } - + @Override - public void signal(SignalRecord signal) { + public void signal(DebeziumEngine.Signal signal) { if (channels == null) { channels = engine.tasks().stream() .map(EngineSourceTask::debeziumConnectTask) @@ -36,6 +33,7 @@ public void signal(SignalRecord signal) { .flatMap(Optional::stream) .toList(); } - channels.forEach(channel -> channel.signal(signal)); + var sr = new SignalRecord(signal); + channels.forEach(channel -> channel.signal(sr)); } }