DBZ-8135 Added Signaler support to async engine

This commit is contained in:
Jakub Cechacek 2024-08-12 16:09:29 +02:00 committed by Jiri Pechanec
parent 4a027fb072
commit dd5e6e1a16
3 changed files with 68 additions and 6 deletions

View File

@ -117,14 +117,15 @@ public final class AsyncEmbeddedEngine<R> implements DebeziumEngine<R>, AsyncEng
private AsyncEmbeddedEngine(Properties config,
Consumer<R> consumer,
DebeziumEngine.ChangeConsumer<R> handler,
ChangeConsumer<R> handler,
ClassLoader classLoader,
io.debezium.util.Clock clock,
DebeziumEngine.CompletionCallback completionCallback,
DebeziumEngine.ConnectorCallback connectorCallback,
CompletionCallback completionCallback,
ConnectorCallback connectorCallback,
OffsetCommitPolicy offsetCommitPolicy,
HeaderConverter headerConverter,
Function<SourceRecord, R> recordConverter) {
Function<SourceRecord, R> recordConverter,
Signaler<?> signaler) {
this.config = Configuration.from(Objects.requireNonNull(config, "A connector configuration must be specified."));
this.consumer = consumer;
@ -136,6 +137,9 @@ private AsyncEmbeddedEngine(Properties config,
this.headerConverter = headerConverter;
this.recordConverter = recordConverter;
this.sourceConverter = (record) -> ((EmbeddedEngineChangeEvent<?, ?, ?>) record).sourceRecord();
if (signaler != null) {
signaler.init(this);
}
// Ensure either user ChangeConsumer or Consumer is provided and validate supported records ordering is provided when relevant.
if (this.handler == null & this.consumer == null) {
@ -189,6 +193,10 @@ private AsyncEmbeddedEngine(Properties config,
offsetValueConverter.configure(internalConverterConfig, false);
}
List<EngineSourceTask> tasks() {
return tasks;
}
@Override
public void run() {
Throwable exitError = null;
@ -838,6 +846,7 @@ public static final class AsyncEngineBuilder<R> implements DebeziumEngine.Builde
private HeaderConverter headerConverter;
private Function<SourceRecord, R> recordConverter;
private ConverterBuilder converterBuilder;
private Signaler<?> signaler;
AsyncEngineBuilder() {
this((KeyValueHeaderChangeEventFormat<?, ?, ?>) null);
@ -884,6 +893,12 @@ public Builder<R> notifying(final ChangeConsumer<R> handler) {
return this;
}
@Override
public Builder<R> using(Signaler<?> signaler) {
this.signaler = signaler;
return this;
}
@Override
public Builder<R> using(final Properties config) {
this.config = config;
@ -930,7 +945,7 @@ public DebeziumEngine<R> build() {
recordConverter = converterBuilder.toFormat(headerConverter);
}
return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, headerConverter,
recordConverter);
recordConverter, signaler);
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded.async;
import java.util.Optional;
import io.debezium.DebeziumException;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.source.EngineSourceTask;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.channels.process.InProcessSignalChannel;
public class AsyncEngineSignaler implements DebeziumEngine.Signaler<SignalRecord> {
private AsyncEmbeddedEngine<?> engine;
@Override
public <E extends DebeziumEngine<?>> void init(E engine) {
if (!(engine instanceof AsyncEmbeddedEngine)) {
throw new DebeziumException("AsyncEngineSignaler can only be used with AsyncEmbeddedEngine");
}
this.engine = (AsyncEmbeddedEngine<?>) engine;
}
@Override
public void signal(SignalRecord signal) {
engine.tasks().stream()
.map(EngineSourceTask::debeziumConnectTask)
.flatMap(Optional::stream)
.map(task -> task.getSignalChannel(InProcessSignalChannel.class))
.flatMap(Optional::stream)
.forEach(channel -> channel.signal(signal));
}
}

View File

@ -5,8 +5,12 @@
*/
package io.debezium.engine.source;
import java.util.Optional;
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.connector.common.BaseSourceTask;
/**
* Implementation of {@link DebeziumSourceTask} which currently serves only as a wrapper
* around Kafka Connect {@link SourceTask}.
@ -31,4 +35,11 @@ public DebeziumSourceTaskContext context() {
public SourceTask connectTask() {
return connectTask;
}
}
@SuppressWarnings("unchecked")
public Optional<BaseSourceTask<?, ?>> debeziumConnectTask() {
return Optional.of(connectTask)
.filter(BaseSourceTask.class::isInstance)
.map(BaseSourceTask.class::cast);
}
}