DBZ-5772 Add custom injection point to Jetstream connector, fix test

Signed-off-by: Skezzowski <sipblai@gmail.com>
This commit is contained in:
Skezzowski 2022-11-23 07:39:45 +01:00 committed by Jiri Pechanec
parent 852a2fe4b4
commit f8fac41273
2 changed files with 14 additions and 1 deletions

View File

@ -10,8 +10,11 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import io.debezium.server.CustomConsumerBuilder;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ -54,12 +57,23 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer
@ConfigProperty(name = PROP_CREATE_STREAM, defaultValue = "false")
boolean createStream;
@Inject
@CustomConsumerBuilder
Instance<JetStream> customStreamingConnection;
@PostConstruct
void connect() {
// Read config
final Config config = ConfigProvider.getConfig();
String url = config.getValue(PROP_URL, String.class);
if (customStreamingConnection.isResolvable()) {
js = customStreamingConnection.get();
LOGGER.info("Obtained custom configured JetStream '{}'", js);
return;
}
try {
// Setup NATS connection
io.nats.client.Options natsOptions = new io.nats.client.Options.Builder()

View File

@ -20,7 +20,6 @@ public NatsJetStreamTestConfigSource() {
natsJetStreamTest.put("debezium.sink.nats-jetstream.url",
NatsJetStreamTestResourceLifecycleManager.getNatsContainerUrl());
natsJetStreamTest.put("debezium.sink.nats-jetstream.create-stream", "true");
natsJetStreamTest.put("debezium.sink.nats-jetstream.subjects", "asd,asd");
natsJetStreamTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
natsJetStreamTest.put("debezium.source.topic.prefix", "testc");
natsJetStreamTest.put("debezium.source.schema.include.list", "inventory");