diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 898ad8e96..814a17a85 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -7,7 +7,6 @@ import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; @@ -502,15 +501,13 @@ public static OversizeHandlingMode parse(String value, String defaultValue) { protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0; /** - * The {@link ReplicaSets#SEPARATOR}-separated list of connection strings + * The task connection string */ public static final Field TASK_CONNECTION_STRING = Field.createInternal("mongodb.internal.task.connection.string") .withDescription("Internal use only") .withType(Type.LIST); - /** - * The {@link ReplicaSets#SEPARATOR}-separated list of connection strings - */ + public static final Field ALLOW_OFFSET_INVALIDATION = Field.createInternal("mongodb.allow.offset.invalidation") .withDescription("Allows offset invalidation when required by change of connection mode") .withDefault(false) @@ -898,7 +895,7 @@ public static ConfigDef configDef() { private final boolean offsetInvalidationAllowed; private final int snapshotMaxThreads; private final int cursorMaxAwaitTimeMs; - private final ReplicaSets replicaSets; + private final ReplicaSet replicaSet; private final CursorPipelineOrder cursorPipelineOrder; private final OversizeHandlingMode oversizeHandlingMode; private final FiltersMatchMode filtersMatchMode; @@ -932,7 +929,7 @@ public MongoDbConnectorConfig(Configuration config) { this.snapshotMaxThreads = resolveSnapshotMaxThreads(config); this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0); - this.replicaSets = resolveReplicaSets(config); + this.replicaSet = resolveReplicaSet(config); } private static int validateHosts(Configuration config, Field field, ValidationOutput problems) { @@ -1113,8 +1110,8 @@ public boolean isOffsetInvalidationAllowed() { return offsetInvalidationAllowed; } - public ReplicaSets getReplicaSets() { - return replicaSets; + public ReplicaSet getReplicaSet() { + return replicaSet; } public int getCursorMaxAwaitTime() { @@ -1191,10 +1188,9 @@ private static int resolveSnapshotMaxThreads(Configuration config) { return config.getInteger(SNAPSHOT_MAX_THREADS); } - private static ReplicaSets resolveReplicaSets(Configuration config) { + private static ReplicaSet resolveReplicaSet(Configuration config) { var connectionString = config.getString(MongoDbConnectorConfig.TASK_CONNECTION_STRING); - var replicaSet = new ReplicaSet(connectionString); - return new ReplicaSets(List.of(replicaSet)); + return new ReplicaSet(connectionString); } @Override diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java index e09488b60..fb7d91f5f 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java @@ -18,7 +18,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.DebeziumException; import io.debezium.annotation.ThreadSafe; import io.debezium.bean.StandardBeanNames; import io.debezium.config.Configuration; @@ -188,7 +187,7 @@ protected Iterable getAllConfigurationFields() { } private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorConfig) { - MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(connectorConfig, connectorConfig.getReplicaSets()); + MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(connectorConfig, connectorConfig.getReplicaSet()); Collection> partitions = loader.getPartitions(); Map, Map> offsets = context.offsetStorageReader().offsets(partitions); @@ -203,12 +202,7 @@ private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorC } private ReplicaSet getReplicaSet(MongoDbConnectorConfig connectorConfig) { - - final ReplicaSets replicaSets = connectorConfig.getReplicaSets(); - if (replicaSets.size() == 0) { - throw new DebeziumException("Unable to start MongoDB connector task since no replica sets were found"); - } - return replicaSets.getTheOnlyReplicaSet(); + return connectorConfig.getReplicaSet(); } @Override diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java index fd9953661..364f93680 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java @@ -112,24 +112,23 @@ public ReplicaSetOffsetContext getReplicaSetOffsetContext(ReplicaSet replicaSet) public static class Loader { - private final ReplicaSets replicaSets; + private final ReplicaSet replicaSet; private final SourceInfo sourceInfo; - public Loader(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) { + public Loader(MongoDbConnectorConfig connectorConfig, ReplicaSet replicaSet) { this.sourceInfo = new SourceInfo(connectorConfig); - this.replicaSets = replicaSets; + this.replicaSet = replicaSet; } public Collection> getPartitions() { + // todo: Consider looking up shard specific offsets from prior versions (if moving from sharded cluster with RS mode) // todo: DBZ-1726 - follow-up by removing partition management from SourceInfo final Collection> partitions = new ArrayList<>(); - replicaSets.onEachReplicaSet(replicaSet -> { - final String name = replicaSet.replicaSetName(); // may be null for standalone servers - if (name != null) { - Map partition = sourceInfo.partition(name); - partitions.add(partition); - } - }); + final String name = replicaSet.replicaSetName(); + + Map partition = sourceInfo.partition(name); + partitions.add(partition); + return partitions; } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSets.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSets.java deleted file mode 100644 index 16b453c82..000000000 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSets.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.mongodb; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.function.Consumer; - -import org.apache.kafka.connect.util.ConnectorUtils; - -import io.debezium.annotation.Immutable; -import io.debezium.connector.mongodb.connection.ReplicaSet; -import io.debezium.util.Strings; - -/** - * A set of replica set specifications. - * - * @author Randall Hauch - */ -@Immutable -public class ReplicaSets { - - public static final String SEPARATOR = "|"; - - /** - * Get an instance that contains no replica sets. - * - * @return the empty instance; never null - */ - public static ReplicaSets empty() { - return new ReplicaSets(null); - } - - public static ReplicaSets of(ReplicaSet... replicaSets) { - return new ReplicaSets(Arrays.asList(replicaSets)); - } - - private final List replicaSets = new ArrayList<>(); - - /** - * Create a set of replica set specifications. - * - * @param rsSpecs the replica set specifications; may be null or empty - */ - public ReplicaSets(Collection rsSpecs) { - if (rsSpecs != null) { - replicaSets.addAll(rsSpecs); - } - Collections.sort(replicaSets); - } - - /** - * Get the number of replica sets. - * - * @return the replica set count - */ - public int size() { - return replicaSets.size(); - } - - /** - * Perform the supplied function on each of the replica sets - * - * @param function the consumer function; may not be null - */ - public void onEachReplicaSet(Consumer function) { - this.replicaSets.forEach(function); - } - - /** - * Subdivide this collection of replica sets into the maximum number of groups. - * - * @param maxSubdivisionCount the maximum number of subdivisions - * @param subdivisionConsumer the function to be called with each subdivision; may not be null - */ - public void subdivide(int maxSubdivisionCount, Consumer subdivisionConsumer) { - int numGroups = Math.min(size(), maxSubdivisionCount); - ConnectorUtils.groupPartitions(all(), numGroups).stream() - .map(ReplicaSets::new) - .forEach(subdivisionConsumer); - } - - /** - * Get a copy of all of the {@link ReplicaSet} objects. - * - * @return the replica set objects; never null but possibly empty - */ - public List all() { - return new ArrayList<>(this.replicaSets); - } - - /** - * Get the replica ReplicaSet -- there should be only one - * - * @return the only replica set - */ - public ReplicaSet getTheOnlyReplicaSet() { - return all().get(0); - } - - /** - * Determine if one or more replica sets has been added or removed since the prior state. - * - * @param priorState the prior state of the replica sets; may be null - * @return {@code true} if the replica sets have changed since the prior state, or {@code false} otherwise - */ - public boolean haveChangedSince(ReplicaSets priorState) { - return !this.replicaSets.equals(priorState.replicaSets); - } - - @Override - public int hashCode() { - return replicaSets.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj instanceof ReplicaSets) { - ReplicaSets that = (ReplicaSets) obj; - return this.replicaSets.equals(that.replicaSets); - } - return false; - } - - @Override - public String toString() { - return Strings.join(";", all()); - } - -} diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java deleted file mode 100644 index 074dc49e7..000000000 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.mongodb; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Test; - -import com.mongodb.ConnectionString; - -import io.debezium.connector.mongodb.connection.ConnectionStrings; -import io.debezium.connector.mongodb.connection.ReplicaSet; - -/** - * @author Randall Hauch - * - */ -public class ReplicaSetsTest { - - @Test - public void shouldHaveNoReplicaSetsInEmptyInstance() { - assertThat(ReplicaSets.empty().size()).isEqualTo(0); - } - - @Test - public void shouldParseNullHostString() { - assertThat(ConnectionStrings.parseFromHosts(null)).isEmpty(); - } - - @Test - public void shouldParseEmptyHostString() { - assertThat(ConnectionStrings.parseFromHosts("")).isEmpty(); - } - - @Test - public void shouldParseBlankHostString() { - assertThat(ConnectionStrings.parseFromHosts("")).isEmpty(); - } - - @Test - public void shouldParseSingleHostStringWithStandaloneAddress() { - var cs = ConnectionStrings.parseFromHosts("localhost:27017"); - assertThat(cs).hasValue("mongodb://localhost:27017/"); - } - - @Test - public void shouldParseHostStringWithStandaloneAddresses() { - var hosts = "localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017"; - var cs = ConnectionStrings.parseFromHosts(hosts); - assertThat(cs).hasValue("mongodb://" + hosts + "/"); - } - - @Test - public void shouldParseHostStringWithAddressAndReplicaSet() { - var cs = ConnectionStrings.parseFromHosts("myReplicaSet/localhost:27017"); - assertThat(cs).hasValue("mongodb://localhost:27017/?replicaSet=myReplicaSet"); - } - - @Test - public void shouldParseHostStringWithIpv6AddressAndReplicaSet() { - var cs = ConnectionStrings.parseFromHosts("myReplicaSet/[fe80::601:9bff:feab:ec01]:27017"); - assertThat(cs).hasValue("mongodb://[fe80::601:9bff:feab:ec01]:27017/?replicaSet=myReplicaSet"); - } - - @Test - public void shouldParseHostStringWithAddressesAndReplicaSet() { - var hosts = "localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017"; - var cs = ConnectionStrings.parseFromHosts("myReplicaSet/" + hosts); - assertThat(cs).hasValue("mongodb://" + hosts + "/?replicaSet=myReplicaSet"); - } - - @Test - public void shouldHaveAttributesFromConnectionString() { - var cs = new ConnectionString("mongodb://localhost:27017/?replicaSet=rs0"); - var rs = new ReplicaSet(cs); - - assertThat(rs.hasReplicaSetName()).isTrue(); - assertThat(rs.replicaSetName()).isEqualTo("rs0"); - assertThat(rs.connectionString()).isEqualTo(cs); - } - - @Test - public void shouldConsiderUnchangedSameInstance() { - var rs = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var sets = ReplicaSets.of(rs); - - assertThat(sets.haveChangedSince(sets)).isFalse(); - } - - @Test - public void shouldConsiderUnchangedSameReplicaSets() { - var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var sets0 = ReplicaSets.of(rs0); - var sets1 = ReplicaSets.of(rs1); - - assertThat(sets0.haveChangedSince(sets1)).isFalse(); - } - - @Test - public void shouldConsiderUnchangedSameReplicaSetsWithDifferentAddresses() { - var rs0 = new ReplicaSet("mongodb://1.2.3.4:27017,localhost:28017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var rs2 = new ReplicaSet("mongodb://localhost:28017/?replicaSet=rs2"); - var sets0 = ReplicaSets.of(rs0, rs2); - var sets1 = ReplicaSets.of(rs1, rs2); - - assertThat(sets0.haveChangedSince(sets1)).isFalse(); - } - - @Test - public void shouldConsiderChangedDifferentReplicaSetsWithSameAddresses() { - var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs1"); - var sets0 = ReplicaSets.of(rs0); - var sets1 = ReplicaSets.of(rs1); - - assertThat(sets0.haveChangedSince(sets1)).isTrue(); - } - - @Test - public void shouldConsiderChangedReplicaSetsWithAdditionalReplicaSet() { - var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var sets0 = ReplicaSets.of(rs0); - var sets1 = ReplicaSets.of(rs0, rs1); - - assertThat(sets1.haveChangedSince(sets0)).isTrue(); - } - - @Test - public void shouldConsiderChangedReplicaSetsWithRemovedReplicaSet() { - var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0"); - var sets0 = ReplicaSets.of(rs0, rs1); - var sets1 = ReplicaSets.of(rs0); - - assertThat(sets1.haveChangedSince(sets0)).isTrue(); - } - - @Test - public void shouldNotSubdivideOneReplicaSet() { - var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0"); - var sets = ReplicaSets.of(rs0); - - List divided = new ArrayList<>(); - sets.subdivide(1, divided::add); - assertThat(divided.size()).isEqualTo(1); - assertThat(divided.get(0)).isEqualTo(sets); - } - - @Test - public void shouldNotSubdivideMultipleReplicaSetsIntoOneGroup() { - var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://host2:27017/?replicaSet=rs01"); - var sets = ReplicaSets.of(rs0); - - List divided = new ArrayList<>(); - sets.subdivide(1, divided::add); - assertThat(divided.size()).isEqualTo(1); - assertThat(divided.get(0)).isEqualTo(sets); - - } - - @Test - public void shouldSubdivideMultipleReplicaSetsWithIntoMultipleGroups() { - var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0"); - var rs1 = new ReplicaSet("mongodb://host2:27017/?replicaSet=rs01"); - var sets = ReplicaSets.of(rs0, rs1); - - List divided = new ArrayList<>(); - sets.subdivide(2, divided::add); - assertThat(divided.size()).isEqualTo(2); - assertThat(divided.get(0).all()).containsExactly(sets.all().get(0)); - assertThat(divided.get(1).all()).containsExactly(sets.all().get(1)); - } - -}