From 3a04f45939680d5b6537957c780b88821527fd2c Mon Sep 17 00:00:00 2001 From: Thiago Avancini Date: Fri, 13 Aug 2021 18:00:55 -0300 Subject: [PATCH] DBZ-3815 Remove NATS connection handlers/listeners --- .../NatsStreamingChangeConsumer.java | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/debezium-server/debezium-server-nats-streaming/src/main/java/io/debezium/server/nats-streaming/NatsStreamingChangeConsumer.java b/debezium-server/debezium-server-nats-streaming/src/main/java/io/debezium/server/nats-streaming/NatsStreamingChangeConsumer.java index 85d83c82e..e5d7f4742 100644 --- a/debezium-server/debezium-server-nats-streaming/src/main/java/io/debezium/server/nats-streaming/NatsStreamingChangeConsumer.java +++ b/debezium-server/debezium-server-nats-streaming/src/main/java/io/debezium/server/nats-streaming/NatsStreamingChangeConsumer.java @@ -26,9 +26,8 @@ import io.debezium.server.BaseChangeConsumer; import io.debezium.server.CustomConsumerBuilder; import io.nats.client.Connection; -import io.nats.client.ConnectionListener; +import io.nats.client.Nats; import io.nats.streaming.NatsStreaming; -import io.nats.streaming.Options; import io.nats.streaming.StreamingConnection; /** @@ -52,6 +51,7 @@ public class NatsStreamingChangeConsumer extends BaseChangeConsumer private String clusterId; private String clientId; + private Connection nc; private StreamingConnection sc; @Inject @@ -72,21 +72,18 @@ void connect() { clusterId = config.getValue(PROP_CLUSTER_ID, String.class); clientId = config.getValue(PROP_CLIENT_ID, String.class); - // Setup NATS Streaming connection - Options stanOptions = new Options.Builder() - .natsUrl(url) - .connectionListener(new ConnectionListener() { - public void connectionEvent(Connection natsConnection, Events event) { - if (event == Events.DISCONNECTED) { - throw new DebeziumException("NATS Streaming disconnected."); - } - } - }) - .connectionLostHandler((streamingConnection, e) -> { - throw new DebeziumException("NATS Streaming connection lost."); - }) - .build(); try { + // Setup NATS connection + io.nats.client.Options natsOptions = new io.nats.client.Options.Builder() + .server(url) + .noReconnect() + .build(); + nc = Nats.connect(natsOptions); + + // Setup NATS Streaming connection + io.nats.streaming.Options stanOptions = new io.nats.streaming.Options.Builder() + .natsConn(nc) + .build(); sc = NatsStreaming.connect(clusterId, clientId, stanOptions); } catch (Exception e) { @@ -99,11 +96,18 @@ public void connectionEvent(Connection natsConnection, Events event) { @PreDestroy void close() { try { - sc.close(); - LOGGER.info("NATS Streaming connection closed."); + if (sc != null) { + sc.close(); + LOGGER.info("NATS Streaming connection closed."); + } + + if (nc != null) { + nc.close(); + LOGGER.info("NATS connection closed."); + } } catch (Exception e) { - // Do nothing + throw new DebeziumException(e); } }