From f8fac41273a060467fc02742fa9cc4e12014fd13 Mon Sep 17 00:00:00 2001 From: Skezzowski Date: Wed, 23 Nov 2022 07:39:45 +0100 Subject: [PATCH] DBZ-5772 Add custom injection point to Jetstream connector, fix test Signed-off-by: Skezzowski --- .../jetstream/NatsJetStreamChangeConsumer.java | 14 ++++++++++++++ .../jetstream/NatsJetStreamTestConfigSource.java | 1 - 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/debezium-server/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java b/debezium-server/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java index 31f9ddd84..51d5feb30 100644 --- a/debezium-server/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java +++ b/debezium-server/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java @@ -10,8 +10,11 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; import javax.inject.Named; +import io.debezium.server.CustomConsumerBuilder; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -54,12 +57,23 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer @ConfigProperty(name = PROP_CREATE_STREAM, defaultValue = "false") boolean createStream; + + @Inject + @CustomConsumerBuilder + Instance customStreamingConnection; + @PostConstruct void connect() { // Read config final Config config = ConfigProvider.getConfig(); String url = config.getValue(PROP_URL, String.class); + if (customStreamingConnection.isResolvable()) { + js = customStreamingConnection.get(); + LOGGER.info("Obtained custom configured JetStream '{}'", js); + return; + } + try { // Setup NATS connection io.nats.client.Options natsOptions = new io.nats.client.Options.Builder() diff --git a/debezium-server/debezium-server-nats-jetstream/src/test/java/io/debezium/server/nats/jetstream/NatsJetStreamTestConfigSource.java b/debezium-server/debezium-server-nats-jetstream/src/test/java/io/debezium/server/nats/jetstream/NatsJetStreamTestConfigSource.java index cee5205d8..5e9abcc98 100644 --- a/debezium-server/debezium-server-nats-jetstream/src/test/java/io/debezium/server/nats/jetstream/NatsJetStreamTestConfigSource.java +++ b/debezium-server/debezium-server-nats-jetstream/src/test/java/io/debezium/server/nats/jetstream/NatsJetStreamTestConfigSource.java @@ -20,7 +20,6 @@ public NatsJetStreamTestConfigSource() { natsJetStreamTest.put("debezium.sink.nats-jetstream.url", NatsJetStreamTestResourceLifecycleManager.getNatsContainerUrl()); natsJetStreamTest.put("debezium.sink.nats-jetstream.create-stream", "true"); - natsJetStreamTest.put("debezium.sink.nats-jetstream.subjects", "asd,asd"); natsJetStreamTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); natsJetStreamTest.put("debezium.source.topic.prefix", "testc"); natsJetStreamTest.put("debezium.source.schema.include.list", "inventory");