DBZ-7260 Removed ReplicaSets class

This commit is contained in:
Jakub Cechacek 2023-12-13 14:50:50 +01:00
parent 17cd2158bd
commit 85fdf034e5
5 changed files with 19 additions and 353 deletions

View File

@ -7,7 +7,6 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -502,15 +501,13 @@ public static OversizeHandlingMode parse(String value, String defaultValue) {
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0; protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0;
/** /**
* The {@link ReplicaSets#SEPARATOR}-separated list of connection strings * The task connection string
*/ */
public static final Field TASK_CONNECTION_STRING = Field.createInternal("mongodb.internal.task.connection.string") public static final Field TASK_CONNECTION_STRING = Field.createInternal("mongodb.internal.task.connection.string")
.withDescription("Internal use only") .withDescription("Internal use only")
.withType(Type.LIST); .withType(Type.LIST);
/**
* The {@link ReplicaSets#SEPARATOR}-separated list of connection strings
*/
public static final Field ALLOW_OFFSET_INVALIDATION = Field.createInternal("mongodb.allow.offset.invalidation") public static final Field ALLOW_OFFSET_INVALIDATION = Field.createInternal("mongodb.allow.offset.invalidation")
.withDescription("Allows offset invalidation when required by change of connection mode") .withDescription("Allows offset invalidation when required by change of connection mode")
.withDefault(false) .withDefault(false)
@ -898,7 +895,7 @@ public static ConfigDef configDef() {
private final boolean offsetInvalidationAllowed; private final boolean offsetInvalidationAllowed;
private final int snapshotMaxThreads; private final int snapshotMaxThreads;
private final int cursorMaxAwaitTimeMs; private final int cursorMaxAwaitTimeMs;
private final ReplicaSets replicaSets; private final ReplicaSet replicaSet;
private final CursorPipelineOrder cursorPipelineOrder; private final CursorPipelineOrder cursorPipelineOrder;
private final OversizeHandlingMode oversizeHandlingMode; private final OversizeHandlingMode oversizeHandlingMode;
private final FiltersMatchMode filtersMatchMode; private final FiltersMatchMode filtersMatchMode;
@ -932,7 +929,7 @@ public MongoDbConnectorConfig(Configuration config) {
this.snapshotMaxThreads = resolveSnapshotMaxThreads(config); this.snapshotMaxThreads = resolveSnapshotMaxThreads(config);
this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0); this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0);
this.replicaSets = resolveReplicaSets(config); this.replicaSet = resolveReplicaSet(config);
} }
private static int validateHosts(Configuration config, Field field, ValidationOutput problems) { private static int validateHosts(Configuration config, Field field, ValidationOutput problems) {
@ -1113,8 +1110,8 @@ public boolean isOffsetInvalidationAllowed() {
return offsetInvalidationAllowed; return offsetInvalidationAllowed;
} }
public ReplicaSets getReplicaSets() { public ReplicaSet getReplicaSet() {
return replicaSets; return replicaSet;
} }
public int getCursorMaxAwaitTime() { public int getCursorMaxAwaitTime() {
@ -1191,10 +1188,9 @@ private static int resolveSnapshotMaxThreads(Configuration config) {
return config.getInteger(SNAPSHOT_MAX_THREADS); return config.getInteger(SNAPSHOT_MAX_THREADS);
} }
private static ReplicaSets resolveReplicaSets(Configuration config) { private static ReplicaSet resolveReplicaSet(Configuration config) {
var connectionString = config.getString(MongoDbConnectorConfig.TASK_CONNECTION_STRING); var connectionString = config.getString(MongoDbConnectorConfig.TASK_CONNECTION_STRING);
var replicaSet = new ReplicaSet(connectionString); return new ReplicaSet(connectionString);
return new ReplicaSets(List.of(replicaSet));
} }
@Override @Override

View File

@ -18,7 +18,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe; import io.debezium.annotation.ThreadSafe;
import io.debezium.bean.StandardBeanNames; import io.debezium.bean.StandardBeanNames;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
@ -188,7 +187,7 @@ protected Iterable<Field> getAllConfigurationFields() {
} }
private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorConfig) { private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorConfig) {
MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(connectorConfig, connectorConfig.getReplicaSets()); MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(connectorConfig, connectorConfig.getReplicaSet());
Collection<Map<String, String>> partitions = loader.getPartitions(); Collection<Map<String, String>> partitions = loader.getPartitions();
Map<Map<String, String>, Map<String, Object>> offsets = context.offsetStorageReader().offsets(partitions); Map<Map<String, String>, Map<String, Object>> offsets = context.offsetStorageReader().offsets(partitions);
@ -203,12 +202,7 @@ private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorC
} }
private ReplicaSet getReplicaSet(MongoDbConnectorConfig connectorConfig) { private ReplicaSet getReplicaSet(MongoDbConnectorConfig connectorConfig) {
return connectorConfig.getReplicaSet();
final ReplicaSets replicaSets = connectorConfig.getReplicaSets();
if (replicaSets.size() == 0) {
throw new DebeziumException("Unable to start MongoDB connector task since no replica sets were found");
}
return replicaSets.getTheOnlyReplicaSet();
} }
@Override @Override

View File

@ -112,24 +112,23 @@ public ReplicaSetOffsetContext getReplicaSetOffsetContext(ReplicaSet replicaSet)
public static class Loader { public static class Loader {
private final ReplicaSets replicaSets; private final ReplicaSet replicaSet;
private final SourceInfo sourceInfo; private final SourceInfo sourceInfo;
public Loader(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) { public Loader(MongoDbConnectorConfig connectorConfig, ReplicaSet replicaSet) {
this.sourceInfo = new SourceInfo(connectorConfig); this.sourceInfo = new SourceInfo(connectorConfig);
this.replicaSets = replicaSets; this.replicaSet = replicaSet;
} }
public Collection<Map<String, String>> getPartitions() { public Collection<Map<String, String>> getPartitions() {
// todo: Consider looking up shard specific offsets from prior versions (if moving from sharded cluster with RS mode)
// todo: DBZ-1726 - follow-up by removing partition management from SourceInfo // todo: DBZ-1726 - follow-up by removing partition management from SourceInfo
final Collection<Map<String, String>> partitions = new ArrayList<>(); final Collection<Map<String, String>> partitions = new ArrayList<>();
replicaSets.onEachReplicaSet(replicaSet -> { final String name = replicaSet.replicaSetName();
final String name = replicaSet.replicaSetName(); // may be null for standalone servers
if (name != null) {
Map<String, String> partition = sourceInfo.partition(name); Map<String, String> partition = sourceInfo.partition(name);
partitions.add(partition); partitions.add(partition);
}
});
return partitions; return partitions;
} }

View File

@ -1,139 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.kafka.connect.util.ConnectorUtils;
import io.debezium.annotation.Immutable;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.util.Strings;
/**
* A set of replica set specifications.
*
* @author Randall Hauch
*/
@Immutable
public class ReplicaSets {
public static final String SEPARATOR = "|";
/**
* Get an instance that contains no replica sets.
*
* @return the empty instance; never null
*/
public static ReplicaSets empty() {
return new ReplicaSets(null);
}
public static ReplicaSets of(ReplicaSet... replicaSets) {
return new ReplicaSets(Arrays.asList(replicaSets));
}
private final List<ReplicaSet> replicaSets = new ArrayList<>();
/**
* Create a set of replica set specifications.
*
* @param rsSpecs the replica set specifications; may be null or empty
*/
public ReplicaSets(Collection<ReplicaSet> rsSpecs) {
if (rsSpecs != null) {
replicaSets.addAll(rsSpecs);
}
Collections.sort(replicaSets);
}
/**
* Get the number of replica sets.
*
* @return the replica set count
*/
public int size() {
return replicaSets.size();
}
/**
* Perform the supplied function on each of the replica sets
*
* @param function the consumer function; may not be null
*/
public void onEachReplicaSet(Consumer<ReplicaSet> function) {
this.replicaSets.forEach(function);
}
/**
* Subdivide this collection of replica sets into the maximum number of groups.
*
* @param maxSubdivisionCount the maximum number of subdivisions
* @param subdivisionConsumer the function to be called with each subdivision; may not be null
*/
public void subdivide(int maxSubdivisionCount, Consumer<ReplicaSets> subdivisionConsumer) {
int numGroups = Math.min(size(), maxSubdivisionCount);
ConnectorUtils.groupPartitions(all(), numGroups).stream()
.map(ReplicaSets::new)
.forEach(subdivisionConsumer);
}
/**
* Get a copy of all of the {@link ReplicaSet} objects.
*
* @return the replica set objects; never null but possibly empty
*/
public List<ReplicaSet> all() {
return new ArrayList<>(this.replicaSets);
}
/**
* Get the replica ReplicaSet -- there should be only one
*
* @return the only replica set
*/
public ReplicaSet getTheOnlyReplicaSet() {
return all().get(0);
}
/**
* Determine if one or more replica sets has been added or removed since the prior state.
*
* @param priorState the prior state of the replica sets; may be null
* @return {@code true} if the replica sets have changed since the prior state, or {@code false} otherwise
*/
public boolean haveChangedSince(ReplicaSets priorState) {
return !this.replicaSets.equals(priorState.replicaSets);
}
@Override
public int hashCode() {
return replicaSets.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof ReplicaSets) {
ReplicaSets that = (ReplicaSets) obj;
return this.replicaSets.equals(that.replicaSets);
}
return false;
}
@Override
public String toString() {
return Strings.join(";", all());
}
}

View File

@ -1,184 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import com.mongodb.ConnectionString;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.connection.ReplicaSet;
/**
* @author Randall Hauch
*
*/
public class ReplicaSetsTest {
@Test
public void shouldHaveNoReplicaSetsInEmptyInstance() {
assertThat(ReplicaSets.empty().size()).isEqualTo(0);
}
@Test
public void shouldParseNullHostString() {
assertThat(ConnectionStrings.parseFromHosts(null)).isEmpty();
}
@Test
public void shouldParseEmptyHostString() {
assertThat(ConnectionStrings.parseFromHosts("")).isEmpty();
}
@Test
public void shouldParseBlankHostString() {
assertThat(ConnectionStrings.parseFromHosts("")).isEmpty();
}
@Test
public void shouldParseSingleHostStringWithStandaloneAddress() {
var cs = ConnectionStrings.parseFromHosts("localhost:27017");
assertThat(cs).hasValue("mongodb://localhost:27017/");
}
@Test
public void shouldParseHostStringWithStandaloneAddresses() {
var hosts = "localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017";
var cs = ConnectionStrings.parseFromHosts(hosts);
assertThat(cs).hasValue("mongodb://" + hosts + "/");
}
@Test
public void shouldParseHostStringWithAddressAndReplicaSet() {
var cs = ConnectionStrings.parseFromHosts("myReplicaSet/localhost:27017");
assertThat(cs).hasValue("mongodb://localhost:27017/?replicaSet=myReplicaSet");
}
@Test
public void shouldParseHostStringWithIpv6AddressAndReplicaSet() {
var cs = ConnectionStrings.parseFromHosts("myReplicaSet/[fe80::601:9bff:feab:ec01]:27017");
assertThat(cs).hasValue("mongodb://[fe80::601:9bff:feab:ec01]:27017/?replicaSet=myReplicaSet");
}
@Test
public void shouldParseHostStringWithAddressesAndReplicaSet() {
var hosts = "localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017";
var cs = ConnectionStrings.parseFromHosts("myReplicaSet/" + hosts);
assertThat(cs).hasValue("mongodb://" + hosts + "/?replicaSet=myReplicaSet");
}
@Test
public void shouldHaveAttributesFromConnectionString() {
var cs = new ConnectionString("mongodb://localhost:27017/?replicaSet=rs0");
var rs = new ReplicaSet(cs);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.replicaSetName()).isEqualTo("rs0");
assertThat(rs.connectionString()).isEqualTo(cs);
}
@Test
public void shouldConsiderUnchangedSameInstance() {
var rs = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets = ReplicaSets.of(rs);
assertThat(sets.haveChangedSince(sets)).isFalse();
}
@Test
public void shouldConsiderUnchangedSameReplicaSets() {
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets0 = ReplicaSets.of(rs0);
var sets1 = ReplicaSets.of(rs1);
assertThat(sets0.haveChangedSince(sets1)).isFalse();
}
@Test
public void shouldConsiderUnchangedSameReplicaSetsWithDifferentAddresses() {
var rs0 = new ReplicaSet("mongodb://1.2.3.4:27017,localhost:28017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs2 = new ReplicaSet("mongodb://localhost:28017/?replicaSet=rs2");
var sets0 = ReplicaSets.of(rs0, rs2);
var sets1 = ReplicaSets.of(rs1, rs2);
assertThat(sets0.haveChangedSince(sets1)).isFalse();
}
@Test
public void shouldConsiderChangedDifferentReplicaSetsWithSameAddresses() {
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs1");
var sets0 = ReplicaSets.of(rs0);
var sets1 = ReplicaSets.of(rs1);
assertThat(sets0.haveChangedSince(sets1)).isTrue();
}
@Test
public void shouldConsiderChangedReplicaSetsWithAdditionalReplicaSet() {
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets0 = ReplicaSets.of(rs0);
var sets1 = ReplicaSets.of(rs0, rs1);
assertThat(sets1.haveChangedSince(sets0)).isTrue();
}
@Test
public void shouldConsiderChangedReplicaSetsWithRemovedReplicaSet() {
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets0 = ReplicaSets.of(rs0, rs1);
var sets1 = ReplicaSets.of(rs0);
assertThat(sets1.haveChangedSince(sets0)).isTrue();
}
@Test
public void shouldNotSubdivideOneReplicaSet() {
var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0");
var sets = ReplicaSets.of(rs0);
List<ReplicaSets> divided = new ArrayList<>();
sets.subdivide(1, divided::add);
assertThat(divided.size()).isEqualTo(1);
assertThat(divided.get(0)).isEqualTo(sets);
}
@Test
public void shouldNotSubdivideMultipleReplicaSetsIntoOneGroup() {
var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://host2:27017/?replicaSet=rs01");
var sets = ReplicaSets.of(rs0);
List<ReplicaSets> divided = new ArrayList<>();
sets.subdivide(1, divided::add);
assertThat(divided.size()).isEqualTo(1);
assertThat(divided.get(0)).isEqualTo(sets);
}
@Test
public void shouldSubdivideMultipleReplicaSetsWithIntoMultipleGroups() {
var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://host2:27017/?replicaSet=rs01");
var sets = ReplicaSets.of(rs0, rs1);
List<ReplicaSets> divided = new ArrayList<>();
sets.subdivide(2, divided::add);
assertThat(divided.size()).isEqualTo(2);
assertThat(divided.get(0).all()).containsExactly(sets.all().get(0));
assertThat(divided.get(1).all()).containsExactly(sets.all().get(1));
}
}