DBZ-5260 Filter out unavailable replicaset members

This commit is contained in:
Tim Patterson 2022-06-16 13:27:01 +12:00 committed by Jiri Pechanec
parent 38cd6907ae
commit c0323305ac
3 changed files with 91 additions and 2 deletions

View File

@ -68,6 +68,11 @@
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>

View File

@ -17,6 +17,7 @@
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.MongoClient;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ServerConnectionState;
import com.mongodb.connection.ServerDescription;
import io.debezium.annotation.ThreadSafe;
@ -90,8 +91,9 @@ public ReplicaSets getReplicaSets() {
LOGGER.info("Checking current members of replica set at {}", seedAddresses);
if (clusterDescription != null) {
// This is a replica set ...
final List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions();
if (serverDescriptions == null || serverDescriptions.size() == 0) {
final List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions().stream()
.filter(x -> x.getState() == ServerConnectionState.CONNECTED).collect(Collectors.toList());
if (serverDescriptions.size() == 0) {
LOGGER.warn("Server descriptions not available, got '{}'", serverDescriptions);
}
else {

View File

@ -0,0 +1,82 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerConnectionState;
import com.mongodb.connection.ServerDescription;
import com.mongodb.MongoException;
import com.mongodb.MongoSocketOpenException;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
public class ReplicaSetDiscoveryTest {
private ReplicaSetDiscovery replicaSetDiscovery;
private MongoDbTaskContext context;
private ConnectionContext connectionContext;
private MongoClient mongoClient;
private String seedHosts = "host1:27017,host2:27017";
@Before
public void setup() {
context = mock(MongoDbTaskContext.class);
connectionContext = mock(ConnectionContext.class);
mongoClient = mock(MongoClient.class);
when(context.getConnectionContext()).thenReturn(connectionContext);
when(connectionContext.hosts()).thenReturn(seedHosts);
when(connectionContext.clientFor(seedHosts)).thenReturn(mongoClient);
replicaSetDiscovery = new ReplicaSetDiscovery(context);
}
/**
* Testing the case where a member of a replicaset may be down.
*/
@Test
public void shouldGetFirstValidReplicaSetName() {
// Skip the config server section of ReplicaSetDiscovery.getReplicaSets
// by throwing an error here.
when(mongoClient.listDatabaseNames()).thenThrow(new MongoException("dummy"));
ServerAddress host1Address = new ServerAddress("host1");
ServerAddress host2Address = new ServerAddress("host2");
List<ServerDescription> serverDescriptions = List.of(
ServerDescription.builder()
.address(host1Address)
.state(ServerConnectionState.CONNECTING)
.exception(new MongoSocketOpenException("can't connect", host1Address))
.build(),
ServerDescription.builder()
.address(host2Address)
.state(ServerConnectionState.CONNECTED)
.setName("my_rs")
.build());
ClusterDescription clusterDescription = new ClusterDescription(
ClusterConnectionMode.MULTIPLE,
ClusterType.REPLICA_SET,
serverDescriptions);
when(mongoClient.getClusterDescription()).thenReturn(clusterDescription);
ReplicaSets replicaSets = replicaSetDiscovery.getReplicaSets();
assertThat(replicaSets.validReplicaSets().size()).isEqualTo(1);
assertThat(replicaSets.validReplicaSets().get(0).replicaSetName()).isEqualTo("my_rs");
assertThat(replicaSets.validReplicaSets().get(0).addresses()).isEqualTo(List.of(host2Address));
}
}