From f305bcd9abf41097447795880cbbb0e4218cea60 Mon Sep 17 00:00:00 2001 From: jcechace Date: Wed, 1 Mar 2023 21:37:05 +0100 Subject: [PATCH] DBZ-6032 Further simplifying connection logic --- .../connector/mongodb/MongoDbConnector.java | 21 +-- .../mongodb/MongoDbConnectorConfig.java | 2 +- ...bIncrementalSnapshotChangeEventSource.java | 20 +-- .../MongoDbSnapshotChangeEventSource.java | 80 ++++------- .../MongoDbStreamingChangeEventSource.java | 64 ++++----- .../mongodb/ReplicaSetDiscovery.java | 124 ++++++++++-------- .../mongodb/ReplicaSetMonitorThread.java | 14 +- .../mongodb/connection/ConnectionContext.java | 79 ++++------- .../mongodb/connection/ConnectionStrings.java | 14 ++ .../mongodb/connection/MongoClients.java | 116 ---------------- .../connection/MongoDbClientFactory.java | 67 ++++++++++ ...ongoClient.java => MongoDbConnection.java} | 89 +++++++------ .../mongodb/AbstractMongoConnectorIT.java | 9 +- .../connector/mongodb/AbstractMongoIT.java | 13 +- .../connector/mongodb/ConnectionIT.java | 4 +- .../connector/mongodb/MongoDbSchemaIT.java | 8 -- .../mongodb/ReplicaSetDiscoveryTest.java | 2 +- .../connector/mongodb/ReplicaSetsTest.java | 2 +- .../connector/mongodb/TestHelper.java | 4 +- .../outbox/MongoEventRouterTestIT.java | 9 +- 20 files changed, 314 insertions(+), 427 deletions(-) delete mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoClients.java create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java rename debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/{RetryingMongoClient.java => MongoDbConnection.java} (66%) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java index 12c220028..2328a3b50 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java @@ -118,7 +118,7 @@ public void start(Map props) { // Set up and start the thread that monitors the members of all of the replica sets ... replicaSetMonitorExecutor = Threads.newSingleThreadExecutor(MongoDbConnector.class, taskContext.serverName(), "replica-set-monitor"); ReplicaSetDiscovery monitor = new ReplicaSetDiscovery(taskContext); - monitorThread = new ReplicaSetMonitorThread(monitor::getReplicaSets, connectionContext.pollInterval(), + monitorThread = new ReplicaSetMonitorThread(connectionContext, monitor::getReplicaSets, connectionContext.pollInterval(), Clock.SYSTEM, () -> taskContext.configureLoggingContext("disc"), this::replicaSetsChanged); replicaSetMonitorExecutor.execute(monitorThread); logger.info("Successfully started MongoDB connector, and continuing to discover changes in replica set(s) at {}", connectionContext.maskedConnectionSeed()); @@ -186,14 +186,7 @@ public void stop() { if (replicaSetMonitorExecutor != null) { replicaSetMonitorExecutor.shutdownNow(); } - try { - if (this.connectionContext != null) { - this.connectionContext.close(); - } - } - finally { - logger.info("Stopped MongoDB connector"); - } + logger.info("Stopped MongoDB connector"); } finally { if (previousLogContext != null) { @@ -225,17 +218,17 @@ public Config validate(Map connectorConfigs) { && userValue.errorMessages().isEmpty() && passwordValue.errorMessages().isEmpty() && connectionStringValue.errorMessages().isEmpty()) { - // Try to connect to the database ... - try (ConnectionContext connContext = new ConnectionContext(config)) { - try (MongoClient client = connContext.connect()) { - client.listDatabaseNames(); - } + // Try to connect to the database ... + ConnectionContext connContext = new ConnectionContext(config); + try (MongoClient client = connContext.connect()) { + client.listDatabaseNames(); } catch (MongoException e) { hostsValue.addErrorMessage("Unable to connect: " + e.getMessage()); } } + return new Config(new ArrayList<>(results.values())); } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 194ec3e28..04d20826c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -12,7 +12,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import io.debezium.connector.mongodb.connection.ConnectionStrings; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -32,6 +31,7 @@ import io.debezium.config.Field.ValidationOutput; import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.connector.mongodb.connection.ConnectionStrings; import io.debezium.data.Envelope; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.spi.schema.DataCollectionId; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index 56cd29407..ff52e8416 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -26,8 +26,8 @@ import io.debezium.DebeziumException; import io.debezium.annotation.NotThreadSafe; import io.debezium.connector.mongodb.connection.ConnectionContext; +import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.connection.ReplicaSet; -import io.debezium.connector.mongodb.connection.RetryingMongoClient; import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; @@ -72,8 +72,7 @@ public class MongoDbIncrementalSnapshotChangeEventSource protected EventDispatcher dispatcher; protected IncrementalSnapshotContext context = null; protected final Map window = new LinkedHashMap<>(); - private RetryingMongoClient primary; - private RetryingMongoClient secondary; + private MongoDbConnection mongo; private CollectionId signallingCollectionId; @@ -180,7 +179,7 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key) protected void emitWindowOpen() throws InterruptedException { final CollectionId collectionId = signallingCollectionId; final String id = context.currentChunkId() + "-open"; - primary.executeBlocking( + mongo.executeBlocking( "emit window open for chunk '" + context.currentChunkId() + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); @@ -201,7 +200,7 @@ protected void emitWindowOpen() throws InterruptedException { protected void emitWindowClose() throws InterruptedException { final CollectionId collectionId = signallingCollectionId; final String id = context.currentChunkId() + "-close"; - primary.executeBlocking( + mongo.executeBlocking( "emit window close for chunk '" + context.currentChunkId() + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); @@ -219,8 +218,9 @@ protected void emitWindowClose() throws InterruptedException { @Override @SuppressWarnings("unchecked") public void init(MongoDbPartition partition, OffsetContext offsetContext) { - primary = establishConnection(partition, ReadPreference.primary(), replicaSets.all().get(0)); - secondary = establishConnection(partition, ReadPreference.secondaryPreferred(), replicaSets.all().get(0)); + // Only ReplicaSet deployments are supported by incremental snapshot + // Thus assume replicaSets.size() == 1 + mongo = establishConnection(partition, ReadPreference.secondaryPreferred(), replicaSets.all().get(0)); if (offsetContext == null) { LOGGER.info("Empty incremental snapshot change event source started, no action needed"); @@ -322,7 +322,7 @@ private void nextDataCollection(MongoDbPartition partition) { private Object[] readMaximumKey() throws InterruptedException { final CollectionId collectionId = (CollectionId) currentCollection.id(); final AtomicReference key = new AtomicReference<>(); - secondary.executeBlocking("maximum key for '" + collectionId + "'", client -> { + mongo.executeBlocking("maximum key for '" + collectionId + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name()); @@ -417,7 +417,7 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw long exportStart = clock.currentTimeInMillis(); LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount()); - secondary.executeBlocking("chunk query key for '" + currentCollection.id() + "'", client -> { + mongo.executeBlocking("chunk query key for '" + currentCollection.id() + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name(), BsonDocument.class); @@ -564,7 +564,7 @@ public void processMessage(MongoDbPartition partition, DataCollectionId dataColl } } - private RetryingMongoClient establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) { + private MongoDbConnection establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) { return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> { // propagate authorization failures if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 0654a26cd..1a6d012ba 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -34,8 +34,8 @@ import io.debezium.DebeziumException; import io.debezium.connector.SnapshotRecord; import io.debezium.connector.mongodb.connection.ConnectionContext; +import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.connection.ReplicaSet; -import io.debezium.connector.mongodb.connection.RetryingMongoClient; import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; @@ -146,14 +146,8 @@ protected SnapshotResult doExecute(ChangeEventSourceContex aborted.set(true); } - // Shutdown executor and close connections - try { - executor.shutdown(); - } - finally { - LOGGER.info("Stopping mongodb connections"); - taskContext.getConnectionContext().close(); - } + // Shutdown the executor + executor.shutdown(); if (aborted.get()) { return SnapshotResult.aborted(); @@ -181,26 +175,17 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo // Collect which replica-sets require being snapshotted final List replicaSetSnapshots = new ArrayList<>(); - final MongoDbOffsetContext offsetContext = (MongoDbOffsetContext) previousOffset; - try { - replicaSets.onEachReplicaSet(replicaSet -> { - RetryingMongoClient mongo = null; - try { - mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred()); - final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet); - if (mongo != null && isSnapshotExpected(mongo, rsOffsetContext)) { - replicaSetSnapshots.add(replicaSet); - } + + for (var replicaSet : replicaSets.all()) { + try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred())) { + final ReplicaSetOffsetContext rsOffsetContext = previousOffset.getReplicaSetOffsetContext(replicaSet); + if (mongo != null && isSnapshotExpected(mongo, rsOffsetContext)) { + replicaSetSnapshots.add(replicaSet); } - finally { - if (mongo != null) { - mongo.stop(); - } - } - }); - } - finally { - taskContext.getConnectionContext().close(); + } + catch (InterruptedException e) { + throw new DebeziumException("Interrupted while creating snapshotting task", e); + } } return new MongoDbSnapshottingTask(replicaSetSnapshots); @@ -213,21 +198,12 @@ protected SnapshotContext prepare(MongoD } private void snapshotReplicaSet(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext ctx, ReplicaSet replicaSet) throws InterruptedException { - RetryingMongoClient mongo = null; - try { - mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.secondaryPreferred()); - if (mongo != null) { - createDataEvents(sourceContext, ctx, replicaSet, mongo); - } - } - finally { - if (mongo != null) { - mongo.stop(); - } + try (MongoDbConnection mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.secondaryPreferred())) { + createDataEvents(sourceContext, ctx, replicaSet, mongo); } } - private RetryingMongoClient establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) { + private MongoDbConnection establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) { return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> { // propagate authorization failures if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) { @@ -241,7 +217,7 @@ private RetryingMongoClient establishConnection(MongoDbPartition partition, Repl }); } - private boolean isSnapshotExpected(RetryingMongoClient mongo, ReplicaSetOffsetContext offsetContext) { + private boolean isSnapshotExpected(MongoDbConnection mongo, ReplicaSetOffsetContext offsetContext) throws InterruptedException { boolean performSnapshot = true; if (offsetContext.hasOffset()) { if (LOGGER.isInfoEnabled()) { @@ -292,17 +268,11 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets final Map positions = new LinkedHashMap<>(); replicaSets.onEachReplicaSet(replicaSet -> { LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName()); - RetryingMongoClient mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred()); - if (mongo != null) { - try { - mongo.execute("get oplog position", client -> { - positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER)); - }); - } - finally { - LOGGER.info("Stopping primary client"); - mongo.stop(); - } + + try (MongoDbConnection mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred())) { + mongo.execute("get oplog position", client -> { + positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER)); + }); } }); @@ -311,7 +281,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets } private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet, - RetryingMongoClient mongo) + MongoDbConnection mongo) throws InterruptedException { SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); snapshotContext.offset.preSnapshotStart(); @@ -329,7 +299,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSna private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, - ReplicaSet replicaSet, RetryingMongoClient mongo) + ReplicaSet replicaSet, MongoDbConnection mongo) throws InterruptedException { final String rsName = replicaSet.replicaSetName(); @@ -435,7 +405,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex private void createDataEventsForCollection(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, - ReplicaSet replicaSet, CollectionId collectionId, RetryingMongoClient mongo) + ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index 317ea19dc..f42635a1d 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -30,8 +30,8 @@ import io.debezium.DebeziumException; import io.debezium.connector.mongodb.connection.ConnectionContext; +import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.connection.ReplicaSet; -import io.debezium.connector.mongodb.connection.RetryingMongoClient; import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; @@ -85,42 +85,28 @@ public void execute(ChangeEventSourceContext context, MongoDbPartition partition offsetContext = initializeOffsets(connectorConfig, partition, replicaSets); } - try { - if (validReplicaSets.size() == 1) { - // Streams the replica-set changes in the current thread - streamChangesForReplicaSet(context, partition, validReplicaSets.get(0), offsetContext); - } - else if (validReplicaSets.size() > 1) { - // Starts a thread for each replica-set and executes the streaming process - streamChangesForReplicaSets(context, partition, validReplicaSets, offsetContext); - } + if (validReplicaSets.size() == 1) { + // Streams the replica-set changes in the current thread + streamChangesForReplicaSet(context, partition, validReplicaSets.get(0), offsetContext); } - finally { - taskContext.getConnectionContext().close(); + else if (validReplicaSets.size() > 1) { + // Starts a thread for each replica-set and executes the streaming process + streamChangesForReplicaSets(context, partition, validReplicaSets, offsetContext); } } private void streamChangesForReplicaSet(ChangeEventSourceContext context, MongoDbPartition partition, ReplicaSet replicaSet, MongoDbOffsetContext offsetContext) { - RetryingMongoClient mongo = null; - try { - mongo = establishConnection(partition, replicaSet, ReadPreference.secondaryPreferred()); - if (mongo != null) { - final AtomicReference mongoReference = new AtomicReference<>(mongo); - mongo.execute("read from change stream on '" + replicaSet + "'", client -> { - readChangeStream(client, mongoReference.get(), replicaSet, context, offsetContext); - }); - } + try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.secondaryPreferred())) { + final AtomicReference mongoReference = new AtomicReference<>(mongo); + mongo.execute("read from change stream on '" + replicaSet + "'", client -> { + readChangeStream(client, mongoReference.get(), replicaSet, context, offsetContext); + }); } catch (Throwable t) { LOGGER.error("Streaming for replica set {} failed", replicaSet.replicaSetName(), t); errorHandler.setProducerThrowable(t); } - finally { - if (mongo != null) { - mongo.stop(); - } - } } private void streamChangesForReplicaSets(ChangeEventSourceContext context, MongoDbPartition partition, @@ -153,7 +139,7 @@ private void streamChangesForReplicaSets(ChangeEventSourceContext context, Mongo executor.shutdown(); } - private RetryingMongoClient establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) { + private MongoDbConnection establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) { return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> { // propagate authorization failures if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) { @@ -167,7 +153,7 @@ private RetryingMongoClient establishConnection(MongoDbPartition partition, Repl }); } - private void readChangeStream(MongoClient client, RetryingMongoClient mongo, ReplicaSet replicaSet, ChangeEventSourceContext context, + private void readChangeStream(MongoClient client, MongoDbConnection mongo, ReplicaSet replicaSet, ChangeEventSourceContext context, MongoDbOffsetContext offsetContext) { final ReplicaSetPartition rsPartition = offsetContext.getReplicaSetPartition(replicaSet); final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet); @@ -271,17 +257,11 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto final Map positions = new LinkedHashMap<>(); replicaSets.onEachReplicaSet(replicaSet -> { LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName()); - RetryingMongoClient mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred()); - if (mongo != null) { - try { - mongo.execute("get oplog position", client -> { - positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER)); - }); - } - finally { - LOGGER.info("Stopping primary client"); - mongo.stop(); - } + + try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred())) { + mongo.execute("get oplog position", client -> { + positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER)); + }); } }); @@ -295,11 +275,11 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto private static class ReplicaSetChangeStreamsContext { private final ReplicaSetPartition partition; private final ReplicaSetOffsetContext offset; - private final RetryingMongoClient mongo; + private final MongoDbConnection mongo; private final ReplicaSet replicaSet; ReplicaSetChangeStreamsContext(ReplicaSetPartition partition, ReplicaSetOffsetContext offsetContext, - RetryingMongoClient mongo, ReplicaSet replicaSet) { + MongoDbConnection mongo, ReplicaSet replicaSet) { this.partition = partition; this.offset = offsetContext; this.mongo = mongo; @@ -314,7 +294,7 @@ ReplicaSetOffsetContext getOffset() { return offset; } - RetryingMongoClient getMongo() { + MongoDbConnection getMongo() { return mongo; } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java index c495b2440..f42eb8e7c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java @@ -8,10 +8,10 @@ import java.util.HashSet; import java.util.Set; -import io.debezium.connector.mongodb.connection.ConnectionStrings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.mongodb.ConnectionString; import com.mongodb.MongoException; import com.mongodb.MongoInterruptedException; import com.mongodb.client.MongoClient; @@ -20,6 +20,7 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.connector.mongodb.connection.ConnectionContext; +import io.debezium.connector.mongodb.connection.ConnectionStrings; import io.debezium.connector.mongodb.connection.ReplicaSet; /** @@ -34,6 +35,7 @@ public class ReplicaSetDiscovery { * The database that might be used to check for replica set information in a sharded cluster. */ public static final String CONFIG_DATABASE_NAME = "config"; + public static final String SHARDS_COLLECTION_NAME = "shards"; /** * The database that might be used to check for member information in a replica set. @@ -61,74 +63,88 @@ public ReplicaSetDiscovery(MongoDbTaskContext context) { * * @return the information about the replica sets; never null but possibly empty */ - public ReplicaSets getReplicaSets() { + public ReplicaSets getReplicaSets(MongoClient client) { ConnectionContext connectionContext = context.getConnectionContext(); - MongoClient client = connectionContext.connect(); Set replicaSetSpecs = new HashSet<>(); final ClusterDescription clusterDescription = MongoUtil.clusterDescription(client); if (clusterDescription.getType() == ClusterType.SHARDED) { LOGGER.info("Cluster at {} identified as sharded cluster", maskedConnectionSeed); - - // Gather connection details to each shard ... - String shardsCollection = "shards"; - try { - MongoUtil.onCollectionDocuments(client, CONFIG_DATABASE_NAME, shardsCollection, doc -> { - String shardName = doc.getString("_id"); - String hostStr = doc.getString("host"); - - LOGGER.info("Reading shard details for {}", shardName); - - ConnectionStrings.parseFromHosts(hostStr).ifPresentOrElse( - cs -> replicaSetSpecs.add(new ReplicaSet(cs)), - () -> LOGGER.info("Shard {} is not a valid replica set", shardName)); - }); - } - catch (MongoInterruptedException e) { - LOGGER.error("Interrupted while reading the '{}' collection in the '{}' database: {}", - shardsCollection, CONFIG_DATABASE_NAME, e.getMessage(), e); - Thread.currentThread().interrupt(); - } - catch (MongoException e) { - LOGGER.error("Error while reading the '{}' collection in the '{}' database: {}", - shardsCollection, CONFIG_DATABASE_NAME, e.getMessage(), e); - } + readReplicaSetsFromShardedCluster(replicaSetSpecs, client); } if (clusterDescription.getType() == ClusterType.REPLICA_SET) { LOGGER.info("Cluster at '{}' identified as replicaSet", maskedConnectionSeed); - - var connectionString = connectionContext.connectionSeed(); - var cs = connectionContext.connectionString(); - - if (cs.getRequiredReplicaSetName() == null) { - // Java driver is smart enough to connect correctly - // However replicaSet parameter is mandatory, and we need the name for offset storage - LOGGER.warn("Replica set not specified in '{}'", maskedConnectionSeed); - LOGGER.warn("Parameter 'replicaSet' should be added to connection string"); - LOGGER.warn("Trying to determine replica set name for '{}'", maskedConnectionSeed); - var rsName = MongoUtil.replicaSetName(clusterDescription); - - if (rsName.isPresent()) { - LOGGER.info("Found '{}' replica set for '{}'", rsName.get(), maskedConnectionSeed); - connectionString = ConnectionStrings.appendParameter(connectionString, "replicaSet", rsName.get()); - } - else { - LOGGER.warn("Unable to find replica set name for '{}'", maskedConnectionSeed); - } - } - - LOGGER.info("Using '{}' as replica set connection string", ConnectionStrings.mask(connectionString)); - replicaSetSpecs.add(new ReplicaSet(connectionString)); + readReplicaSetsFromCluster(replicaSetSpecs, clusterDescription, connectionContext); } if (replicaSetSpecs.isEmpty()) { - // Without a replica sets, we can't do anything ... - LOGGER.error( - "Found no replica sets at {}, so there is nothing to monitor and no connector tasks will be started. Check seed addresses in connector configuration.", - maskedConnectionSeed); + LOGGER.error("Found no replica sets at {}, so there is nothing to monitor and no connector tasks will be started.", maskedConnectionSeed); } + return new ReplicaSets(replicaSetSpecs); } + + private void readReplicaSetsFromCluster(Set replicaSetSpecs, ClusterDescription clusterDescription, ConnectionContext connectionContext) { + var connectionString = ensureReplicaSetName(connectionContext.connectionSeed(), clusterDescription); + + LOGGER.info("Using '{}' as replica set connection string", ConnectionStrings.mask(connectionString)); + replicaSetSpecs.add(new ReplicaSet(connectionString)); + } + + private void readReplicaSetsFromShardedCluster(Set replicaSetSpecs, MongoClient client) { + try { + MongoUtil.onCollectionDocuments(client, CONFIG_DATABASE_NAME, SHARDS_COLLECTION_NAME, doc -> { + String shardName = doc.getString("_id"); + String hostStr = doc.getString("host"); + + LOGGER.info("Reading shard details for {}", shardName); + + ConnectionStrings.parseFromHosts(hostStr).ifPresentOrElse( + cs -> replicaSetSpecs.add(new ReplicaSet(cs)), + () -> LOGGER.info("Shard {} is not a valid replica set", shardName)); + }); + } + catch (MongoInterruptedException e) { + LOGGER.error("Interrupted while reading the '{}' collection in the '{}' database: {}", + SHARDS_COLLECTION_NAME, CONFIG_DATABASE_NAME, e.getMessage(), e); + Thread.currentThread().interrupt(); + } + catch (MongoException e) { + LOGGER.error("Error while reading the '{}' collection in the '{}' database: {}", + SHARDS_COLLECTION_NAME, CONFIG_DATABASE_NAME, e.getMessage(), e); + } + } + + /** + * Ensures connection string contains the replicaSet parameter.If connection string doesn't contain the parameter, + * the replica set name is read from cluster description and the parameter is added. + * + * @param connectionString the original connection string + * @param clusterDescription cluster description + * @return connection string with replicaSet parameter + */ + private String ensureReplicaSetName(String connectionString, ClusterDescription clusterDescription) { + // If we have replicaSet parameter then just return + var cs = new ConnectionString(connectionString); + if (cs.getRequiredReplicaSetName() != null) { + return connectionString; + } + + // Otherwise Java driver is smart enough to connect correctly + // However replicaSet parameter is mandatory, and we need the name for offset storage + LOGGER.warn("Replica set not specified in '{}'", maskedConnectionSeed); + LOGGER.warn("Parameter 'replicaSet' should be added to connection string"); + LOGGER.warn("Trying to determine replica set name for '{}'", maskedConnectionSeed); + var rsName = MongoUtil.replicaSetName(clusterDescription); + + if (rsName.isPresent()) { + LOGGER.info("Found '{}' replica set for '{}'", rsName.get(), maskedConnectionSeed); + return ConnectionStrings.appendParameter(connectionString, "replicaSet", rsName.get()); + } + + LOGGER.warn("Unable to find replica set name for '{}'", maskedConnectionSeed); + return connectionString; + } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java index 110d3834b..a107f9806 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java @@ -9,13 +9,15 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.MongoInterruptedException; +import com.mongodb.client.MongoClient; +import io.debezium.connector.mongodb.connection.ConnectionContext; import io.debezium.util.Clock; import io.debezium.util.Metronome; @@ -30,7 +32,8 @@ public final class ReplicaSetMonitorThread implements Runnable { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Metronome metronome; private final CountDownLatch initialized = new CountDownLatch(1); - private final Supplier monitor; + private final ConnectionContext connectionContext; + private final Function monitor; private final Consumer onChange; private final Runnable onStartup; private volatile ReplicaSets replicaSets = ReplicaSets.empty(); @@ -42,11 +45,12 @@ public final class ReplicaSetMonitorThread implements Runnable { * @param onStartup the function to call when the thread is started; may be null if not needed * @param onChange the function to call when the set of replica set specifications has changed; may be null if not needed */ - public ReplicaSetMonitorThread(Supplier monitor, Duration period, Clock clock, Runnable onStartup, + public ReplicaSetMonitorThread(ConnectionContext connectionContext, Function monitor, Duration period, Clock clock, Runnable onStartup, Consumer onChange) { if (clock == null) { clock = Clock.system(); } + this.connectionContext = connectionContext; this.monitor = monitor; this.metronome = Metronome.sleeper(period, clock); this.onChange = onChange != null ? onChange : (rsSpecs) -> { @@ -62,9 +66,9 @@ public void run() { } while (!Thread.currentThread().isInterrupted()) { - try { + try (var client = connectionContext.connect()) { ReplicaSets previousReplicaSets = replicaSets; - replicaSets = monitor.get(); + replicaSets = monitor.apply(client); initialized.countDown(); // Determine if any replica set specifications have changed ... if (replicaSets.haveChangedSince(previousReplicaSets)) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionContext.java index 2e07a566b..c8fa4618d 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionContext.java @@ -7,7 +7,6 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,12 +25,12 @@ * @author Randall Hauch * */ -public class ConnectionContext implements AutoCloseable { +public class ConnectionContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionContext.class); - protected final Configuration config; - protected final MongoClients pool; + private final Configuration config; + private final MongoDbClientFactory clientFactory; /** * @param config the configuration @@ -39,8 +38,6 @@ public class ConnectionContext implements AutoCloseable { public ConnectionContext(Configuration config) { this.config = config; - final ConnectionString connectionString = connectionString(); - final String username = config.getString(MongoDbConnectorConfig.USER); final String password = config.getString(MongoDbConnectorConfig.PASSWORD); final String adminDbName = config.getString(MongoDbConnectorConfig.AUTH_SOURCE); @@ -53,42 +50,34 @@ public ConnectionContext(Configuration config) { final int serverSelectionTimeoutMs = config.getInteger(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS); // Set up the client pool so that it ... - MongoClients.Builder poolBuilder = MongoClients.create(); + clientFactory = MongoDbClientFactory.create(settings -> { + settings.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) + .readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)) + .applyToClusterSettings( + builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS)) + .applyToServerSettings( + builder -> builder.heartbeatFrequency(heartbeatFrequencyMs, TimeUnit.MILLISECONDS)); - poolBuilder.settings() - .applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) - .readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)) - .applyToClusterSettings( - builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS)) - .applyToServerSettings( - builder -> builder.heartbeatFrequency(heartbeatFrequencyMs, TimeUnit.MILLISECONDS)); + // Use credential if provided as properties + if (username != null || password != null) { + settings.credential(MongoCredential.createCredential(username, adminDbName, password.toCharArray())); + } + if (useSSL) { + settings.applyToSslSettings( + builder -> builder.enabled(true).invalidHostNameAllowed(sslAllowInvalidHostnames)); + } - // Use credential if provided as properties - if (username != null || password != null) { - poolBuilder.withCredential(MongoCredential.createCredential(username, adminDbName, password.toCharArray())); - } - if (useSSL) { - poolBuilder.settings().applyToSslSettings( - builder -> builder.enabled(true).invalidHostNameAllowed(sslAllowInvalidHostnames)); - } - - poolBuilder.settings() - .applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) - .readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)) - .applyToClusterSettings( - builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS)); - - pool = poolBuilder.build(); + settings.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) + .readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS)) + .applyToClusterSettings( + builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS)); + }); } public MongoDbConnectorConfig getConnectorConfig() { return new MongoDbConnectorConfig(config); } - protected Logger logger() { - return LOGGER; - } - /** * Initial connection string which is either a host specification or connection string * @@ -121,7 +110,7 @@ public Duration pollInterval() { } public MongoClient connect() { - return pool.client(connectionString()); + return clientFactory.client(connectionString()); } /** @@ -130,23 +119,11 @@ public MongoClient connect() { * @param replicaSet the replica set information; may not be null * @param filters the filter configuration * @param errorHandler the function to be called whenever the node is unable to - * {@link RetryingMongoClient#execute(String, BlockingConsumer)} execute} an operation to completion; may be null + * {@link MongoDbConnection#execute(String, BlockingConsumer)} execute} an operation to completion; may be null * @return the client, or {@code null} if no primary could be found for the replica set */ - public RetryingMongoClient connect(ReplicaSet replicaSet, ReadPreference preference, Filters filters, - BiConsumer errorHandler) { - return new RetryingMongoClient(replicaSet, preference, pool::client, filters, errorHandler); - } - - @Override - public final void close() { - try { - // Closing all connections ... - logger().info("Closing all connections to {}", maskedConnectionSeed()); - pool.clear(); - } - catch (Throwable e) { - logger().error("Unexpected error shutting down the MongoDB clients", e); - } + public MongoDbConnection connect(ReplicaSet replicaSet, ReadPreference preference, Filters filters, + MongoDbConnection.ErrorHandler errorHandler) { + return new MongoDbConnection(replicaSet, preference, clientFactory, filters, errorHandler); } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionStrings.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionStrings.java index 16a3f3c08..084162f98 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionStrings.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/ConnectionStrings.java @@ -39,6 +39,14 @@ public static String buildFromHosts(String hosts) { return parseFromHosts(hosts).orElseThrow(() -> new DebeziumException("Unable to build connection string")); } + /** + * Appends new parameter to connection string + * + * @param connectionString original connection string + * @param name parameter name + * @param value parameter value + * @return new connection string with added parameter + */ public static String appendParameter(String connectionString, String name, String value) { var param = name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8); @@ -57,6 +65,12 @@ public static String appendParameter(String connectionString, String name, Strin return connectionString + "&" + param; } + /** + * Mask credential information in connection string + * + * @param connectionString original connection string + * @return connection string with masked credential information + */ public static String mask(String connectionString) { var cs = new ConnectionString(connectionString); var credentials = cs.getCredential(); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoClients.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoClients.java deleted file mode 100644 index 42c63095f..000000000 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoClients.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.mongodb.connection; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; - -import com.mongodb.ConnectionString; -import com.mongodb.MongoClientSettings; -import com.mongodb.MongoCredential; -import com.mongodb.ReadPreference; -import com.mongodb.client.MongoClient; - -import io.debezium.annotation.ThreadSafe; - -/** - * A connection pool of MongoClient instances. This pool supports creating clients that communicate explicitly with a single - * server, or clients that communicate with any members of a replica set or sharded cluster given a set of seed addresses. - * - * @author Randall Hauch - */ -@ThreadSafe -public class MongoClients { - - /** - * Obtain a builder that can be used to configure and {@link Builder#build() create} a connection pool. - * - * @return the new builder; never null - */ - public static Builder create() { - return new Builder(); - } - - /** - * Configures and builds a ConnectionPool. - */ - public static class Builder { - private final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder(); - - /** - * Add the given {@link MongoCredential} for use when creating clients. - * - * @param credential the credential; may be {@code null}, though this method does nothing if {@code null} - * @return this builder object so methods can be chained; never null - */ - public Builder withCredential(MongoCredential credential) { - if (credential != null) { - settingsBuilder.credential(credential); - } - return this; - } - - /** - * Obtain the options builder for client connections. - * - * @return the option builder; never null - */ - public MongoClientSettings.Builder settings() { - return settingsBuilder; - } - - /** - * Build the client pool that will use the credentials and options already configured on this builder. - * - * @return the new client pool; never null - */ - public MongoClients build() { - return new MongoClients(settingsBuilder); - } - } - - private final Map connections = new ConcurrentHashMap<>(); - - private final MongoClientSettings defaultSettings; - - private MongoClients(MongoClientSettings.Builder settings) { - this.defaultSettings = settings.build(); - } - - /** - * Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings} - * @return connection settings builder - */ - protected MongoClientSettings.Builder settings() { - return MongoClientSettings.builder(defaultSettings); - } - - /** - * Clear out and close any open connections. - */ - public void clear() { - connections.values().forEach(MongoClient::close); - connections.clear(); - } - - public MongoClient client(ConnectionString connectionString) { - return client(settings -> settings.applyConnectionString(connectionString)); - } - - public MongoClient client(ReplicaSet replicaSet, ReadPreference preference) { - return client(settings -> settings - .applyConnectionString(replicaSet.connectionString()) - .readPreference(preference)); - } - - protected MongoClient client(Consumer configurator) { - MongoClientSettings.Builder settings = settings(); - configurator.accept(settings); - - return connections.computeIfAbsent(settings.build(), com.mongodb.client.MongoClients::create); - } -} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java new file mode 100644 index 000000000..48f14a5b8 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb.connection; + +import java.util.function.Consumer; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.ReadPreference; +import com.mongodb.client.MongoClient; + +import io.debezium.annotation.ThreadSafe; + +/** + * A connection pool of MongoClient instances. + * + * @author Randall Hauch + */ +@ThreadSafe +public final class MongoDbClientFactory { + + private final MongoClientSettings defaultSettings; + + /** + * Obtains new client factory + * + * @param configurator settings instance use as template for all clients + * @return mongo client factory + */ + public static MongoDbClientFactory create(Consumer configurator) { + var settings = MongoClientSettings.builder(); + configurator.accept(settings); + return new MongoDbClientFactory(settings); + } + + private MongoDbClientFactory(MongoClientSettings.Builder settings) { + this.defaultSettings = settings.build(); + } + + /** + * Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings} + * @return connection settings builder + */ + private MongoClientSettings.Builder settings() { + return MongoClientSettings.builder(defaultSettings); + } + + public MongoClient client(ConnectionString connectionString) { + return client(settings -> settings.applyConnectionString(connectionString)); + } + + public MongoClient client(ReplicaSet replicaSet, ReadPreference preference) { + return client(settings -> settings + .applyConnectionString(replicaSet.connectionString()) + .readPreference(preference)); + } + + private MongoClient client(Consumer configurator) { + MongoClientSettings.Builder settings = settings(); + configurator.accept(settings); + + return com.mongodb.client.MongoClients.create(settings.build()); + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/RetryingMongoClient.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java similarity index 66% rename from debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/RetryingMongoClient.java rename to debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java index 4e214c697..8789d622b 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/RetryingMongoClient.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java @@ -11,12 +11,11 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import com.mongodb.ConnectionString; import com.mongodb.ReadPreference; import com.mongodb.client.MongoClient; @@ -29,39 +28,62 @@ import io.debezium.util.Metronome; /** - * Client scoped to specified replica set. - * Internally this wrapper attempts to obtain regular {@link MongoClient} instance using given back-off strategy + * Scoped Mongodb Connection which applies filter configuration and replica set specification when required + * Internally this wrapper attempts to obtain regular {@link MongoClient} instance */ -public class RetryingMongoClient { +public final class MongoDbConnection implements AutoCloseable { + + @FunctionalInterface + public interface ErrorHandler { + /** + * + * @param desc the description of the operation, for logging purposes + * @param error the error which triggered this call + */ + void onError(String desc, Throwable error); + } + /** * A pause between failed MongoDB operations to prevent CPU throttling and DoS of * target MongoDB database. */ private static final Duration PAUSE_AFTER_ERROR = Duration.ofMillis(500); + public static ErrorHandler DEFAULT_ERROR_HANDLER = (String desc, Throwable error) -> { + throw new DebeziumException("Error while attempting to " + desc, error); + }; + private final Filters filters; - private final BiConsumer errorHandler; + private final ErrorHandler errorHandler; private final AtomicBoolean running = new AtomicBoolean(true); - private final String replicaSetName; + private final String name; private final Supplier connectionSupplier; - protected RetryingMongoClient(ReplicaSet replicaSet, - ReadPreference readPreference, - BiFunction connectionSupplier, - Filters filters, - BiConsumer errorHandler) { - this.replicaSetName = replicaSet.replicaSetName(); - this.connectionSupplier = () -> connectionSupplier.apply(replicaSet, readPreference); + protected MongoDbConnection(ReplicaSet replicaSet, + ReadPreference readPreference, + MongoDbClientFactory clientFactory, + Filters filters, + ErrorHandler errorHandler) { + this.name = replicaSet.replicaSetName(); + this.connectionSupplier = () -> clientFactory.client(replicaSet, readPreference); this.filters = filters; this.errorHandler = errorHandler; } + protected MongoDbConnection(ConnectionString connectionString, + MongoDbClientFactory clientFactory, + Filters filters) { + this.name = ConnectionStrings.mask(connectionString.getConnectionString()); + this.connectionSupplier = () -> clientFactory.client(connectionString); + this.filters = filters; + this.errorHandler = DEFAULT_ERROR_HANDLER; + } + /** - * Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops - * (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type. + * Execute the supplied operation. Whenever the operation fails the error handler is called and the operation is repeated * * @param desc the description of the operation, for logging purposes - * @param operation the operation to be performed on a node of preferred type. + * @param operation the operation to be performed */ public void execute(String desc, Consumer operation) { execute(desc, client -> { @@ -71,24 +93,22 @@ public void execute(String desc, Consumer operation) { } /** - * Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops - * (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type. + * Execute the supplied operation. Whenever the operation fails the error handler is called and the operation is repeated * * @param desc the description of the operation, for logging purposes - * @param operation the operation to be performed on a node of preferred type + * @param operation the operation to be performed * @return return value of the executed operation */ public T execute(String desc, Function operation) { final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); - MongoClient client = connectionSupplier.get(); while (true) { - try { + try (var client = connectionSupplier.get()) { return operation.apply(client); } catch (Throwable t) { - errorHandler.accept(desc, t); + errorHandler.onError(desc, t); if (!isRunning()) { - throw new DebeziumException("Operation failed and MongoDB connection '" + replicaSetName + "' termination requested", t); + throw new DebeziumException("Operation failed and MongoDB connection '" + name + "' termination requested", t); } try { errorMetronome.pause(); @@ -120,9 +140,9 @@ public void executeBlocking(String desc, BlockingConsumer operation throw e; } catch (Throwable t) { - errorHandler.accept(desc, t); + errorHandler.onError(desc, t); if (!isRunning()) { - throw new DebeziumException("Operation failed and MongoDB connection '" + replicaSetName + "' termination requested", t); + throw new DebeziumException("Operation failed and MongoDB connection to '" + name + "' termination requested", t); } errorMetronome.pause(); } @@ -130,9 +150,7 @@ public void executeBlocking(String desc, BlockingConsumer operation } /** - * Use a node of preferred type to get the names of all the databases in the replica set, applying the current database - * filter configuration. This method will block until a node of preferred type can be obtained to get the names of all - * databases in the replica set. + * Get the names of all the databases applying the current database filter configuration. * * @return the database names; never null but possibly empty */ @@ -153,21 +171,18 @@ public Set databaseNames() { } /** - * Use a node of preferred type to get the identifiers of all the collections in the replica set, applying the current - * collection filter configuration. This method will block until a primary can be obtained to get the - * identifiers of all collections in the replica set. + * Get the identifiers of all the collections, applying the current collection filter configuration. * * @return the collection identifiers; never null */ public List collections() { - // For each database, get the list of collections ... return execute("get collections in databases", client -> { List collections = new ArrayList<>(); Set databaseNames = databaseNames(); for (String dbName : databaseNames) { MongoUtil.forEachCollectionNameInDatabase(client, dbName, collectionName -> { - CollectionId collectionId = new CollectionId(replicaSetName, dbName, collectionName); + CollectionId collectionId = new CollectionId(name, dbName, collectionName); if (filters.collectionFilter().test(collectionId)) { collections.add(collectionId); @@ -183,10 +198,8 @@ private boolean isRunning() { return running.get(); } - /** - * Terminates the execution loop of the current primary - */ - public void stop() { + @Override + public void close() { running.set(false); } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoConnectorIT.java index a64e4010f..1e6e03cb6 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoConnectorIT.java @@ -74,14 +74,7 @@ public void beforeEach() { @After public void afterEach() { - try { - stopConnector(); - } - finally { - if (context != null) { - context.getConnectionContext().close(); - } - } + stopConnector(); } @BeforeClass diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java index b4cc34bca..3e25b3451 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java @@ -9,7 +9,6 @@ import java.util.Map; -import org.junit.After; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,8 +16,8 @@ import com.mongodb.ReadPreference; import io.debezium.config.Configuration; +import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.connection.ReplicaSet; -import io.debezium.connector.mongodb.connection.RetryingMongoClient; import io.debezium.util.Testing; public abstract class AbstractMongoIT extends AbstractBaseMongoIT { @@ -28,7 +27,7 @@ public abstract class AbstractMongoIT extends AbstractBaseMongoIT { protected Configuration config; protected MongoDbTaskContext context; protected ReplicaSet replicaSet; - protected RetryingMongoClient primary; + protected MongoDbConnection primary; @Before public void beforeEach() { @@ -88,12 +87,4 @@ private void initialize(boolean restartFromBeginning) { primary = context.getConnectionContext().connect( replicaSet, ReadPreference.primary(), context.filters(), TestHelper.connectionErrorHandler(3)); } - - @After - public void afterEach() { - if (context != null) { - // close all connections - context.getConnectionContext().close(); - } - } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java index 606593afe..c4d4d31c4 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java @@ -52,8 +52,8 @@ public void shouldUseSSL() throws InterruptedException, IOException { .with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS, 2000) .build()); - primary.executeBlocking("Try SSL connection", mongo -> { - primary.stop(); + primary.execute("Try SSL connection", mongo -> { + primary.close(); mongo.getDatabase("dbit").listCollectionNames().first(); }); } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbSchemaIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbSchemaIT.java index a930ed274..9483607f9 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbSchemaIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbSchemaIT.java @@ -7,7 +7,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import org.junit.After; import org.junit.Test; import io.debezium.config.Configuration; @@ -21,13 +20,6 @@ public class MongoDbSchemaIT { private Configuration config; private MongoDbTaskContext taskContext; - @After - public void afterEach() { - if (taskContext != null) { - taskContext.getConnectionContext().close(); - } - } - @Test public void shouldAlwaysProduceCollectionSchema() { config = TestHelper.getConfiguration(); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetDiscoveryTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetDiscoveryTest.java index febcfe196..7cd933801 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetDiscoveryTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetDiscoveryTest.java @@ -79,7 +79,7 @@ public void shouldGetFirstValidReplicaSetName() { when(mongoClient.getClusterDescription()).thenReturn(clusterDescription); - ReplicaSets replicaSets = replicaSetDiscovery.getReplicaSets(); + ReplicaSets replicaSets = replicaSetDiscovery.getReplicaSets(mongoClient); assertThat(replicaSets.all().size()).isEqualTo(1); assertThat(replicaSets.all().get(0).replicaSetName()).isEqualTo("my_rs"); } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java index 6a13ad091..074dc49e7 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ReplicaSetsTest.java @@ -10,11 +10,11 @@ import java.util.ArrayList; import java.util.List; -import io.debezium.connector.mongodb.connection.ConnectionStrings; import org.junit.Test; import com.mongodb.ConnectionString; +import io.debezium.connector.mongodb.connection.ConnectionStrings; import io.debezium.connector.mongodb.connection.ReplicaSet; /** diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TestHelper.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TestHelper.java index 300025127..01b2a0f48 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TestHelper.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TestHelper.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,6 +29,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.config.Configuration.Builder; +import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.testing.testcontainers.MongoDbReplicaSet; /** @@ -70,7 +70,7 @@ public static Configuration getConfiguration(String connectionString) { return cfgBuilder.build(); } - public static BiConsumer connectionErrorHandler(int numErrorsBeforeFailing) { + public static MongoDbConnection.ErrorHandler connectionErrorHandler(int numErrorsBeforeFailing) { AtomicInteger attempts = new AtomicInteger(); return (desc, error) -> { if (attempts.incrementAndGet() > numErrorsBeforeFailing) { diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouterTestIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouterTestIT.java index 0a329f992..590cbb3f7 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouterTestIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouterTestIT.java @@ -85,14 +85,7 @@ public void beforeEach(Configuration config) { @After public void afterEach() { - try { - stopConnector(); - } - finally { - if (context != null) { - context.getConnectionContext().close(); - } - } + stopConnector(); outboxEventRouter.close(); }