DBZ-3815 Remove NATS connection handlers/listeners

This commit is contained in:
Thiago Avancini 2021-08-13 18:00:55 -03:00 committed by Gunnar Morling
parent 2791ea0e6b
commit 3a04f45939

View File

@ -26,9 +26,8 @@
import io.debezium.server.BaseChangeConsumer; import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder; import io.debezium.server.CustomConsumerBuilder;
import io.nats.client.Connection; import io.nats.client.Connection;
import io.nats.client.ConnectionListener; import io.nats.client.Nats;
import io.nats.streaming.NatsStreaming; import io.nats.streaming.NatsStreaming;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection; import io.nats.streaming.StreamingConnection;
/** /**
@ -52,6 +51,7 @@ public class NatsStreamingChangeConsumer extends BaseChangeConsumer
private String clusterId; private String clusterId;
private String clientId; private String clientId;
private Connection nc;
private StreamingConnection sc; private StreamingConnection sc;
@Inject @Inject
@ -72,21 +72,18 @@ void connect() {
clusterId = config.getValue(PROP_CLUSTER_ID, String.class); clusterId = config.getValue(PROP_CLUSTER_ID, String.class);
clientId = config.getValue(PROP_CLIENT_ID, String.class); clientId = config.getValue(PROP_CLIENT_ID, String.class);
// Setup NATS Streaming connection
Options stanOptions = new Options.Builder()
.natsUrl(url)
.connectionListener(new ConnectionListener() {
public void connectionEvent(Connection natsConnection, Events event) {
if (event == Events.DISCONNECTED) {
throw new DebeziumException("NATS Streaming disconnected.");
}
}
})
.connectionLostHandler((streamingConnection, e) -> {
throw new DebeziumException("NATS Streaming connection lost.");
})
.build();
try { try {
// Setup NATS connection
io.nats.client.Options natsOptions = new io.nats.client.Options.Builder()
.server(url)
.noReconnect()
.build();
nc = Nats.connect(natsOptions);
// Setup NATS Streaming connection
io.nats.streaming.Options stanOptions = new io.nats.streaming.Options.Builder()
.natsConn(nc)
.build();
sc = NatsStreaming.connect(clusterId, clientId, stanOptions); sc = NatsStreaming.connect(clusterId, clientId, stanOptions);
} }
catch (Exception e) { catch (Exception e) {
@ -99,11 +96,18 @@ public void connectionEvent(Connection natsConnection, Events event) {
@PreDestroy @PreDestroy
void close() { void close() {
try { try {
sc.close(); if (sc != null) {
LOGGER.info("NATS Streaming connection closed."); sc.close();
LOGGER.info("NATS Streaming connection closed.");
}
if (nc != null) {
nc.close();
LOGGER.info("NATS connection closed.");
}
} }
catch (Exception e) { catch (Exception e) {
// Do nothing throw new DebeziumException(e);
} }
} }