DBZ-5987 Cleaning up and updating its test coverage

This commit is contained in:
jcechace 2023-03-12 10:52:29 +01:00 committed by Jiri Pechanec
parent a39c084259
commit 04dbae17ad
3 changed files with 92 additions and 333 deletions

View File

@ -5,19 +5,14 @@
*/ */
package io.debezium.connector.mongodb; package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.bson.BsonDocument; import org.bson.BsonDocument;
import org.bson.Document; import org.bson.Document;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoCursor;
@ -29,7 +24,6 @@
import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerDescription;
import io.debezium.function.BlockingConsumer; import io.debezium.function.BlockingConsumer;
import io.debezium.util.Strings;
/** /**
* Utilities for working with MongoDB. * Utilities for working with MongoDB.
@ -38,43 +32,6 @@
*/ */
public class MongoUtil { public class MongoUtil {
/**
* The delimiter used between addresses.
*/
private static final String ADDRESS_DELIMITER = ",";
public static final Pattern ADDRESS_DELIMITER_PATTERN = Pattern.compile(ADDRESS_DELIMITER);
/**
* Regular expression that gets the host and (optional) port. The raw expression is {@code ([^:]+)(:(\d+))?}.
*/
private static final Pattern ADDRESS_PATTERN = Pattern.compile("([^:]+)(:(\\d+))?");
/**
* Regular expression that gets the IPv6 host and (optional) port, where the IPv6 address must be surrounded
* by square brackets. The raw expression is {@code (\[[^]]+\])(:(\d+))?}.
*/
private static final Pattern IPV6_ADDRESS_PATTERN = Pattern.compile("(\\[[^]]+\\])(:(\\d+))?");
/**
* Find the name of the replica set precedes the host addresses.
*
* @param addresses the string containing the host addresses, of the form {@code replicaSetName/...}; may not be null
* @return the replica set name, or {@code null} if no replica set name is in the string
*/
public static String replicaSetUsedIn(String addresses) {
if (addresses.startsWith("[")) {
// Just an IPv6 address, so no replica set name ...
return null;
}
// Either a replica set name + an address, or just an IPv4 address ...
int index = addresses.indexOf('/');
if (index < 0) {
return null;
}
return addresses.substring(0, index);
}
/** /**
* Perform the given operation on each of the database names. * Perform the given operation on each of the database names.
* *
@ -197,34 +154,6 @@ public static <T> boolean contains(MongoIterable<T> iterable, Predicate<T> match
return false; return false;
} }
/**
* Parse the server address string, of the form {@code host:port} or {@code host}.
* <p>
* The IP address can be either an IPv4 address, or an IPv6 address surrounded by square brackets.
*
* @param addressStr the string containing the host and port; may be null
* @return the server address, or {@code null} if the string did not contain a host or host:port pair
*/
public static ServerAddress parseAddress(String addressStr) {
if (addressStr != null) {
addressStr = addressStr.trim();
Matcher matcher = ADDRESS_PATTERN.matcher(addressStr);
if (!matcher.matches()) {
matcher = IPV6_ADDRESS_PATTERN.matcher(addressStr);
}
if (matcher.matches()) {
// Both regex have the same groups
String host = matcher.group(1);
String port = matcher.group(3);
if (port == null) {
return new ServerAddress(host.trim());
}
return new ServerAddress(host.trim(), Integer.parseInt(port));
}
}
return null;
}
/** /**
* Helper function to extract the session transaction-id from an Change Stream event. * Helper function to extract the session transaction-id from an Change Stream event.
* *
@ -240,96 +169,6 @@ public static SourceInfo.SessionTransactionId getChangeStreamSessionTransactionI
event.getTxnNumber() == null ? null : event.getTxnNumber().longValue()); event.getTxnNumber() == null ? null : event.getTxnNumber().longValue());
} }
/**
* Parse the comma-separated list of server addresses. The format of the supplied string is one of the following:
*
* <pre>
* replicaSetName/host:port
* replicaSetName/host:port,host2:port2
* replicaSetName/host:port,host2:port2,host3:port3
* host:port
* host:port,host2:port2
* host:port,host2:port2,host3:port3
* </pre>
*
* where {@code replicaSetName} is the name of the replica set, {@code host} contains the resolvable hostname or IP address of
* the server, and {@code port} is the integral port number. If the port is not provided, the
* {@link ServerAddress#defaultPort() default port} is used. If neither the host or port are provided (or
* {@code addressString} is {@code null}), then an address will use the {@link ServerAddress#defaultHost() default host} and
* {@link ServerAddress#defaultPort() default port}.
* <p>
* The IP address can be either an IPv4 address, or an IPv6 address surrounded by square brackets.
* <p>
* This method does not use the replica set name.
*
* @param addressStr the string containing a comma-separated list of host and port pairs, optionally preceded by a
* replica set name
* @return the list of server addresses; never null, but possibly empty
*/
protected static List<ServerAddress> parseAddresses(String addressStr) {
List<ServerAddress> addresses = new ArrayList<>();
if (addressStr != null) {
addressStr = addressStr.trim();
for (String address : ADDRESS_DELIMITER_PATTERN.split(addressStr)) {
String hostAndPort = null;
if (address.startsWith("[")) {
// Definitely an IPv6 address without a replica set name ...
hostAndPort = address;
}
else {
// May start with replica set name ...
int index = address.indexOf("/[");
if (index >= 0) {
if ((index + 2) < address.length()) {
// replica set name with IPv6, so use just the IPv6 address ...
hostAndPort = address.substring(index + 1);
}
else {
// replica set name with just opening bracket; this is invalid, so we'll ignore ...
continue;
}
}
else {
// possible replica set name with IPv4 only
index = address.indexOf("/");
if (index >= 0) {
if ((index + 1) < address.length()) {
// replica set name with IPv4, so use just the IPv4 address ...
hostAndPort = address.substring(index + 1);
}
else {
// replica set name with no address ...
hostAndPort = ServerAddress.defaultHost();
}
}
else {
// No replica set name with IPv4, so use the whole address ...
hostAndPort = address;
}
}
}
ServerAddress newAddress = parseAddress(hostAndPort);
if (newAddress != null) {
addresses.add(newAddress);
}
}
}
return addresses;
}
protected static String toString(ServerAddress address) {
String host = address.getHost();
if (host.contains(":")) {
// IPv6 address, so wrap with square brackets ...
return "[" + host + "]:" + address.getPort();
}
return host + ":" + address.getPort();
}
protected static String toString(List<ServerAddress> addresses) {
return Strings.join(ADDRESS_DELIMITER, addresses);
}
/** /**
* Retrieves cluster description, forcing a connection if not yet available * Retrieves cluster description, forcing a connection if not yet available
* *

View File

@ -9,17 +9,13 @@
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.bson.BsonTimestamp;
import org.bson.Document; import org.bson.Document;
import org.bson.conversions.Bson; import org.bson.conversions.Bson;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.mongodb.CursorType;
import com.mongodb.client.FindIterable; import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoCursor;
@ -114,43 +110,5 @@ public void shouldCreateMovieDatabase() throws InterruptedException {
// Now that we've put at least one document into our collection, verify we can see the database and collection ... // Now that we've put at least one document into our collection, verify we can see the database and collection ...
assertThat(primary.databaseNames()).containsOnly("dbA", "dbB"); assertThat(primary.databaseNames()).containsOnly("dbA", "dbB");
assertThat(primary.collections()).containsOnly(new CollectionId(replicaSet.replicaSetName(), "dbA", "moviesA")); assertThat(primary.collections()).containsOnly(new CollectionId(replicaSet.replicaSetName(), "dbA", "moviesA"));
// Read oplog from beginning ...
List<Document> eventQueue = new LinkedList<>();
int minimumEventsExpected = 1;
long maxSeconds = 5;
primary.execute("read oplog from beginning", mongo -> {
Testing.debug("Getting local.oplog.rs");
BsonTimestamp oplogStart = new BsonTimestamp(1, 1);
Bson filter = Filters.and(Filters.gt("ts", oplogStart), // start just after our last position
Filters.exists("fromMigrate", false)); // skip internal movements across shards
FindIterable<Document> results = mongo.getDatabase("local")
.getCollection("oplog.rs")
.find(filter)
.sort(new Document("$natural", 1))
.oplogReplay(true) // tells Mongo to not rely on indexes
.noCursorTimeout(true) // don't timeout waiting for events
.cursorType(CursorType.TailableAwait);
Testing.debug("Reading local.oplog.rs");
try (MongoCursor<Document> cursor = results.iterator();) {
Document event = null;
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxSeconds);
while (System.currentTimeMillis() < stopTime && eventQueue.size() < minimumEventsExpected) {
while ((event = cursor.tryNext()) != null) {
eventQueue.add(event);
}
}
assertThat(eventQueue.size()).isGreaterThanOrEqualTo(1);
}
Testing.debug("Completed local.oplog.rs");
});
eventQueue.forEach(event -> {
Testing.print("Found: " + event);
BsonTimestamp position = event.get("ts", BsonTimestamp.class);
assert position != null;
});
} }
} }

View File

@ -6,161 +6,123 @@
package io.debezium.connector.mongodb; package io.debezium.connector.mongodb;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.junit.Test; import org.junit.Test;
import com.mongodb.MongoSocketOpenException;
import com.mongodb.ServerAddress; import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoIterable;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerConnectionState;
import com.mongodb.connection.ServerDescription;
import io.debezium.util.Collect;
/** /**
* @author Randall Hauch * Tests to verify mongodb utilities
*
*/ */
public class MongoUtilTest { public class MongoUtilTest {
private ServerAddress address;
private List<ServerAddress> addresses = new ArrayList<>();
@Test @Test
public void shouldParseIPv4ServerAddressWithoutPort() { public void shouldGetClusterDescription() {
address = MongoUtil.parseAddress("localhost"); ClusterDescription expectedClusterDescription = new ClusterDescription(
assertThat(address.getHost()).isEqualTo("localhost"); ClusterConnectionMode.MULTIPLE,
assertThat(address.getPort()).isEqualTo(ServerAddress.defaultPort()); ClusterType.REPLICA_SET,
List.of());
var client = mock(MongoClient.class);
when(client.getClusterDescription()).thenReturn(expectedClusterDescription);
var actualDescription = MongoUtil.clusterDescription(client);
assertThat(actualDescription).isEqualTo(expectedClusterDescription);
} }
@Test @Test
public void shouldParseIPv4ServerAddressWithoPort() { public void shouldGetClusterDescriptionAfterForcedConnection() {
address = MongoUtil.parseAddress("localhost:28017"); ClusterDescription unknwonClusterDescription = new ClusterDescription(
assertThat(address.getHost()).isEqualTo("localhost"); ClusterConnectionMode.MULTIPLE,
assertThat(address.getPort()).isEqualTo(28017); ClusterType.UNKNOWN,
List.of());
ClusterDescription expectedClusterDescription = new ClusterDescription(
ClusterConnectionMode.MULTIPLE,
ClusterType.REPLICA_SET,
List.of());
// > Mongodb may not connect right away which results in UNKNOWN cluster type.
// > MongoUtil.clusterDescription() forces the connection by listing databases when needed
@SuppressWarnings("unchecked")
var iterable = (MongoIterable<String>) mock(MongoIterable.class);
when(iterable.first()).thenReturn("name");
var client = mock(MongoClient.class);
when(client.getClusterDescription()).thenReturn(unknwonClusterDescription, expectedClusterDescription);
when(client.listDatabaseNames()).thenReturn(iterable);
var actualDescription = MongoUtil.clusterDescription(client);
assertThat(actualDescription).isEqualTo(expectedClusterDescription);
} }
@Test @Test
public void shouldParseIPv6ServerAddressWithoutPort() { public void shouldGetReplicaSetName() {
address = MongoUtil.parseAddress("[::1/128]"); var rsNames = Collect.arrayListOf(null, "rs0", "rs1");
assertThat(address.getHost()).isEqualTo("::1/128"); // removes brackets var addresses = Collect.arrayListOf(new ServerAddress("host0"),
assertThat(address.getPort()).isEqualTo(ServerAddress.defaultPort()); new ServerAddress("host1"),
new ServerAddress("host2"));
List<ServerDescription> serverDescriptions = List.of(
ServerDescription.builder()
.address(addresses.get(0))
.state(ServerConnectionState.CONNECTING)
.exception(new MongoSocketOpenException("can't connect", addresses.get(0)))
.build(),
ServerDescription.builder()
.address(addresses.get(1))
.state(ServerConnectionState.CONNECTED)
.setName(rsNames.get(1))
.build(),
ServerDescription.builder()
.address(addresses.get(2))
.state(ServerConnectionState.CONNECTED)
.setName(rsNames.get(2)) // In reality servers will have the same rs name
.build());
ClusterDescription clusterDescription = new ClusterDescription(
ClusterConnectionMode.MULTIPLE,
ClusterType.REPLICA_SET,
serverDescriptions);
var actualRsName = MongoUtil.replicaSetName(clusterDescription);
assertThat(actualRsName).hasValue(rsNames.get(1));
} }
@Test @Test
public void shouldParseIPv6ServerAddressWithPort() { public void shouldNotGetReplicaSetName() {
address = MongoUtil.parseAddress("[::1/128]:28017"); var address = new ServerAddress("host0");
assertThat(address.getHost()).isEqualTo("::1/128"); // removes brackets
assertThat(address.getPort()).isEqualTo(28017);
}
@Test List<ServerDescription> serverDescriptions = List.of(
public void shouldParseServerAddressesWithoutPort() { ServerDescription.builder()
addresses = MongoUtil.parseAddresses("host1,host2,[::1/128],host4"); .address(address)
assertThat(addresses.size()).isEqualTo(4); .state(ServerConnectionState.CONNECTING)
assertThat(addresses.get(0).getHost()).isEqualTo("host1"); .exception(new MongoSocketOpenException("can't connect", address))
assertThat(addresses.get(0).getPort()).isEqualTo(ServerAddress.defaultPort()); .build());
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(2).getHost()).isEqualTo("::1/128");
assertThat(addresses.get(2).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(3).getHost()).isEqualTo("host4");
assertThat(addresses.get(3).getPort()).isEqualTo(ServerAddress.defaultPort());
}
@Test ClusterDescription clusterDescription = new ClusterDescription(
public void shouldParseServerAddressesWithPort() { ClusterConnectionMode.MULTIPLE,
addresses = MongoUtil.parseAddresses("host1:2111,host2:3111,[ff02::2:ff00:0/104]:4111,host4:5111"); ClusterType.REPLICA_SET,
assertThat(addresses.size()).isEqualTo(4); serverDescriptions);
assertThat(addresses.get(0).getHost()).isEqualTo("host1");
assertThat(addresses.get(0).getPort()).isEqualTo(2111);
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(3111);
assertThat(addresses.get(2).getHost()).isEqualTo("ff02::2:ff00:0/104");
assertThat(addresses.get(2).getPort()).isEqualTo(4111);
assertThat(addresses.get(3).getHost()).isEqualTo("host4");
assertThat(addresses.get(3).getPort()).isEqualTo(5111);
}
@Test var actualRsName = MongoUtil.replicaSetName(clusterDescription);
public void shouldParseServerAddressesWithReplicaSetNameAndWithoutPort() {
addresses = MongoUtil.parseAddresses("replicaSetName/host1,host2,[::1/128],host4");
assertThat(addresses.size()).isEqualTo(4);
assertThat(addresses.get(0).getHost()).isEqualTo("host1");
assertThat(addresses.get(0).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(2).getHost()).isEqualTo("::1/128");
assertThat(addresses.get(2).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(3).getHost()).isEqualTo("host4");
assertThat(addresses.get(3).getPort()).isEqualTo(ServerAddress.defaultPort());
}
@Test assertThat(actualRsName).isEmpty();
public void shouldParseServerAddressesWithReplicaSetNameAndWithPort() {
addresses = MongoUtil.parseAddresses("replicaSetName/host1:2111,host2:3111,[ff02::2:ff00:0/104]:4111,host4:5111");
assertThat(addresses.size()).isEqualTo(4);
assertThat(addresses.get(0).getHost()).isEqualTo("host1");
assertThat(addresses.get(0).getPort()).isEqualTo(2111);
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(3111);
assertThat(addresses.get(2).getHost()).isEqualTo("ff02::2:ff00:0/104");
assertThat(addresses.get(2).getPort()).isEqualTo(4111);
assertThat(addresses.get(3).getHost()).isEqualTo("host4");
assertThat(addresses.get(3).getPort()).isEqualTo(5111);
}
@Test
public void shouldParseServerIPv6AddressesWithReplicaSetNameAndWithoutPort() {
addresses = MongoUtil.parseAddresses("replicaSetName/[::1/128],host2,[ff02::2:ff00:0/104],host4");
assertThat(addresses.size()).isEqualTo(4);
assertThat(addresses.get(0).getHost()).isEqualTo("::1/128");
assertThat(addresses.get(0).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(2).getHost()).isEqualTo("ff02::2:ff00:0/104");
assertThat(addresses.get(2).getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(addresses.get(3).getHost()).isEqualTo("host4");
assertThat(addresses.get(3).getPort()).isEqualTo(ServerAddress.defaultPort());
}
@Test
public void shouldParseServerIPv6AddressesWithReplicaSetNameAndWithPort() {
addresses = MongoUtil.parseAddresses("replicaSetName/[::1/128]:2111,host2:3111,[ff02::2:ff00:0/104]:4111,host4:5111");
assertThat(addresses.size()).isEqualTo(4);
assertThat(addresses.get(0).getHost()).isEqualTo("::1/128");
assertThat(addresses.get(0).getPort()).isEqualTo(2111);
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(3111);
assertThat(addresses.get(2).getHost()).isEqualTo("ff02::2:ff00:0/104");
assertThat(addresses.get(2).getPort()).isEqualTo(4111);
assertThat(addresses.get(3).getHost()).isEqualTo("host4");
assertThat(addresses.get(3).getPort()).isEqualTo(5111);
}
@Test
public void shouldNotParseServerAddressesWithReplicaSetNameAndOpenBracket() {
addresses = MongoUtil.parseAddresses("replicaSetName/[");
assertThat(addresses.size()).isEqualTo(0);
}
@Test
public void shouldNotParseServerAddressesWithReplicaSetNameAndNoAddress() {
addresses = MongoUtil.parseAddresses("replicaSetName/");
assertThat(addresses.size()).isEqualTo(1);
assertThat(addresses.get(0).getHost()).isEqualTo(ServerAddress.defaultHost());
assertThat(addresses.get(0).getPort()).isEqualTo(ServerAddress.defaultPort());
}
@Test
public void shouldParseReplicaSetName() {
assertThat(MongoUtil.replicaSetUsedIn("rs0/")).isEqualTo("rs0");
assertThat(MongoUtil.replicaSetUsedIn("rs0/localhost")).isEqualTo("rs0");
assertThat(MongoUtil.replicaSetUsedIn("rs0/[::1/128]")).isEqualTo("rs0");
}
@Test
public void shouldNotParseReplicaSetName() {
assertThat(MongoUtil.replicaSetUsedIn("")).isNull();
assertThat(MongoUtil.replicaSetUsedIn("localhost")).isNull();
assertThat(MongoUtil.replicaSetUsedIn("[::1/128]")).isNull();
} }
} }