DBZ-6032 Further simplifying connection logic
This commit is contained in:
parent
ecdb4729af
commit
f305bcd9ab
@ -118,7 +118,7 @@ public void start(Map<String, String> props) {
|
||||
// Set up and start the thread that monitors the members of all of the replica sets ...
|
||||
replicaSetMonitorExecutor = Threads.newSingleThreadExecutor(MongoDbConnector.class, taskContext.serverName(), "replica-set-monitor");
|
||||
ReplicaSetDiscovery monitor = new ReplicaSetDiscovery(taskContext);
|
||||
monitorThread = new ReplicaSetMonitorThread(monitor::getReplicaSets, connectionContext.pollInterval(),
|
||||
monitorThread = new ReplicaSetMonitorThread(connectionContext, monitor::getReplicaSets, connectionContext.pollInterval(),
|
||||
Clock.SYSTEM, () -> taskContext.configureLoggingContext("disc"), this::replicaSetsChanged);
|
||||
replicaSetMonitorExecutor.execute(monitorThread);
|
||||
logger.info("Successfully started MongoDB connector, and continuing to discover changes in replica set(s) at {}", connectionContext.maskedConnectionSeed());
|
||||
@ -186,14 +186,7 @@ public void stop() {
|
||||
if (replicaSetMonitorExecutor != null) {
|
||||
replicaSetMonitorExecutor.shutdownNow();
|
||||
}
|
||||
try {
|
||||
if (this.connectionContext != null) {
|
||||
this.connectionContext.close();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
logger.info("Stopped MongoDB connector");
|
||||
}
|
||||
logger.info("Stopped MongoDB connector");
|
||||
}
|
||||
finally {
|
||||
if (previousLogContext != null) {
|
||||
@ -225,17 +218,17 @@ public Config validate(Map<String, String> connectorConfigs) {
|
||||
&& userValue.errorMessages().isEmpty()
|
||||
&& passwordValue.errorMessages().isEmpty()
|
||||
&& connectionStringValue.errorMessages().isEmpty()) {
|
||||
// Try to connect to the database ...
|
||||
try (ConnectionContext connContext = new ConnectionContext(config)) {
|
||||
|
||||
try (MongoClient client = connContext.connect()) {
|
||||
client.listDatabaseNames();
|
||||
}
|
||||
// Try to connect to the database ...
|
||||
ConnectionContext connContext = new ConnectionContext(config);
|
||||
try (MongoClient client = connContext.connect()) {
|
||||
client.listDatabaseNames();
|
||||
}
|
||||
catch (MongoException e) {
|
||||
hostsValue.addErrorMessage("Unable to connect: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
return new Config(new ArrayList<>(results.values()));
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,6 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
@ -32,6 +31,7 @@
|
||||
import io.debezium.config.Field.ValidationOutput;
|
||||
import io.debezium.connector.AbstractSourceInfo;
|
||||
import io.debezium.connector.SourceInfoStructMaker;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.schema.DefaultTopicNamingStrategy;
|
||||
import io.debezium.spi.schema.DataCollectionId;
|
||||
|
@ -26,8 +26,8 @@
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionContext;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.connector.mongodb.connection.ReplicaSet;
|
||||
import io.debezium.connector.mongodb.connection.RetryingMongoClient;
|
||||
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
|
||||
@ -72,8 +72,7 @@ public class MongoDbIncrementalSnapshotChangeEventSource
|
||||
protected EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
|
||||
protected IncrementalSnapshotContext<CollectionId> context = null;
|
||||
protected final Map<Struct, Object[]> window = new LinkedHashMap<>();
|
||||
private RetryingMongoClient primary;
|
||||
private RetryingMongoClient secondary;
|
||||
private MongoDbConnection mongo;
|
||||
|
||||
private CollectionId signallingCollectionId;
|
||||
|
||||
@ -180,7 +179,7 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key)
|
||||
protected void emitWindowOpen() throws InterruptedException {
|
||||
final CollectionId collectionId = signallingCollectionId;
|
||||
final String id = context.currentChunkId() + "-open";
|
||||
primary.executeBlocking(
|
||||
mongo.executeBlocking(
|
||||
"emit window open for chunk '" + context.currentChunkId() + "'",
|
||||
client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
@ -201,7 +200,7 @@ protected void emitWindowOpen() throws InterruptedException {
|
||||
protected void emitWindowClose() throws InterruptedException {
|
||||
final CollectionId collectionId = signallingCollectionId;
|
||||
final String id = context.currentChunkId() + "-close";
|
||||
primary.executeBlocking(
|
||||
mongo.executeBlocking(
|
||||
"emit window close for chunk '" + context.currentChunkId() + "'",
|
||||
client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
@ -219,8 +218,9 @@ protected void emitWindowClose() throws InterruptedException {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void init(MongoDbPartition partition, OffsetContext offsetContext) {
|
||||
primary = establishConnection(partition, ReadPreference.primary(), replicaSets.all().get(0));
|
||||
secondary = establishConnection(partition, ReadPreference.secondaryPreferred(), replicaSets.all().get(0));
|
||||
// Only ReplicaSet deployments are supported by incremental snapshot
|
||||
// Thus assume replicaSets.size() == 1
|
||||
mongo = establishConnection(partition, ReadPreference.secondaryPreferred(), replicaSets.all().get(0));
|
||||
|
||||
if (offsetContext == null) {
|
||||
LOGGER.info("Empty incremental snapshot change event source started, no action needed");
|
||||
@ -322,7 +322,7 @@ private void nextDataCollection(MongoDbPartition partition) {
|
||||
private Object[] readMaximumKey() throws InterruptedException {
|
||||
final CollectionId collectionId = (CollectionId) currentCollection.id();
|
||||
final AtomicReference<Object> key = new AtomicReference<>();
|
||||
secondary.executeBlocking("maximum key for '" + collectionId + "'", client -> {
|
||||
mongo.executeBlocking("maximum key for '" + collectionId + "'", client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
|
||||
|
||||
@ -417,7 +417,7 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw
|
||||
long exportStart = clock.currentTimeInMillis();
|
||||
LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount());
|
||||
|
||||
secondary.executeBlocking("chunk query key for '" + currentCollection.id() + "'", client -> {
|
||||
mongo.executeBlocking("chunk query key for '" + currentCollection.id() + "'", client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
|
||||
|
||||
@ -564,7 +564,7 @@ public void processMessage(MongoDbPartition partition, DataCollectionId dataColl
|
||||
}
|
||||
}
|
||||
|
||||
private RetryingMongoClient establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) {
|
||||
private MongoDbConnection establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) {
|
||||
return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> {
|
||||
// propagate authorization failures
|
||||
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
|
||||
|
@ -34,8 +34,8 @@
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.connector.SnapshotRecord;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionContext;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.connector.mongodb.connection.ReplicaSet;
|
||||
import io.debezium.connector.mongodb.connection.RetryingMongoClient;
|
||||
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
|
||||
import io.debezium.pipeline.ErrorHandler;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
@ -146,14 +146,8 @@ protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContex
|
||||
aborted.set(true);
|
||||
}
|
||||
|
||||
// Shutdown executor and close connections
|
||||
try {
|
||||
executor.shutdown();
|
||||
}
|
||||
finally {
|
||||
LOGGER.info("Stopping mongodb connections");
|
||||
taskContext.getConnectionContext().close();
|
||||
}
|
||||
// Shutdown the executor
|
||||
executor.shutdown();
|
||||
|
||||
if (aborted.get()) {
|
||||
return SnapshotResult.aborted();
|
||||
@ -181,26 +175,17 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo
|
||||
|
||||
// Collect which replica-sets require being snapshotted
|
||||
final List<ReplicaSet> replicaSetSnapshots = new ArrayList<>();
|
||||
final MongoDbOffsetContext offsetContext = (MongoDbOffsetContext) previousOffset;
|
||||
try {
|
||||
replicaSets.onEachReplicaSet(replicaSet -> {
|
||||
RetryingMongoClient mongo = null;
|
||||
try {
|
||||
mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred());
|
||||
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
|
||||
if (mongo != null && isSnapshotExpected(mongo, rsOffsetContext)) {
|
||||
replicaSetSnapshots.add(replicaSet);
|
||||
}
|
||||
|
||||
for (var replicaSet : replicaSets.all()) {
|
||||
try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred())) {
|
||||
final ReplicaSetOffsetContext rsOffsetContext = previousOffset.getReplicaSetOffsetContext(replicaSet);
|
||||
if (mongo != null && isSnapshotExpected(mongo, rsOffsetContext)) {
|
||||
replicaSetSnapshots.add(replicaSet);
|
||||
}
|
||||
finally {
|
||||
if (mongo != null) {
|
||||
mongo.stop();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
finally {
|
||||
taskContext.getConnectionContext().close();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new DebeziumException("Interrupted while creating snapshotting task", e);
|
||||
}
|
||||
}
|
||||
|
||||
return new MongoDbSnapshottingTask(replicaSetSnapshots);
|
||||
@ -213,21 +198,12 @@ protected SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoD
|
||||
}
|
||||
|
||||
private void snapshotReplicaSet(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext ctx, ReplicaSet replicaSet) throws InterruptedException {
|
||||
RetryingMongoClient mongo = null;
|
||||
try {
|
||||
mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.secondaryPreferred());
|
||||
if (mongo != null) {
|
||||
createDataEvents(sourceContext, ctx, replicaSet, mongo);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
if (mongo != null) {
|
||||
mongo.stop();
|
||||
}
|
||||
try (MongoDbConnection mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.secondaryPreferred())) {
|
||||
createDataEvents(sourceContext, ctx, replicaSet, mongo);
|
||||
}
|
||||
}
|
||||
|
||||
private RetryingMongoClient establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
|
||||
private MongoDbConnection establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
|
||||
return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> {
|
||||
// propagate authorization failures
|
||||
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
|
||||
@ -241,7 +217,7 @@ private RetryingMongoClient establishConnection(MongoDbPartition partition, Repl
|
||||
});
|
||||
}
|
||||
|
||||
private boolean isSnapshotExpected(RetryingMongoClient mongo, ReplicaSetOffsetContext offsetContext) {
|
||||
private boolean isSnapshotExpected(MongoDbConnection mongo, ReplicaSetOffsetContext offsetContext) throws InterruptedException {
|
||||
boolean performSnapshot = true;
|
||||
if (offsetContext.hasOffset()) {
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
@ -292,17 +268,11 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
|
||||
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
|
||||
replicaSets.onEachReplicaSet(replicaSet -> {
|
||||
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
|
||||
RetryingMongoClient mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred());
|
||||
if (mongo != null) {
|
||||
try {
|
||||
mongo.execute("get oplog position", client -> {
|
||||
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
|
||||
});
|
||||
}
|
||||
finally {
|
||||
LOGGER.info("Stopping primary client");
|
||||
mongo.stop();
|
||||
}
|
||||
|
||||
try (MongoDbConnection mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred())) {
|
||||
mongo.execute("get oplog position", client -> {
|
||||
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@ -311,7 +281,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
|
||||
}
|
||||
|
||||
private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet,
|
||||
RetryingMongoClient mongo)
|
||||
MongoDbConnection mongo)
|
||||
throws InterruptedException {
|
||||
SnapshotReceiver<MongoDbPartition> snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();
|
||||
snapshotContext.offset.preSnapshotStart();
|
||||
@ -329,7 +299,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSna
|
||||
private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContext,
|
||||
MongoDbSnapshotContext snapshotContext,
|
||||
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
|
||||
ReplicaSet replicaSet, RetryingMongoClient mongo)
|
||||
ReplicaSet replicaSet, MongoDbConnection mongo)
|
||||
throws InterruptedException {
|
||||
|
||||
final String rsName = replicaSet.replicaSetName();
|
||||
@ -435,7 +405,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
|
||||
private void createDataEventsForCollection(ChangeEventSourceContext sourceContext,
|
||||
MongoDbSnapshotContext snapshotContext,
|
||||
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
|
||||
ReplicaSet replicaSet, CollectionId collectionId, RetryingMongoClient mongo)
|
||||
ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo)
|
||||
throws InterruptedException {
|
||||
|
||||
long exportStart = clock.currentTimeInMillis();
|
||||
|
@ -30,8 +30,8 @@
|
||||
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionContext;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.connector.mongodb.connection.ReplicaSet;
|
||||
import io.debezium.connector.mongodb.connection.RetryingMongoClient;
|
||||
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
|
||||
import io.debezium.pipeline.ErrorHandler;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
@ -85,42 +85,28 @@ public void execute(ChangeEventSourceContext context, MongoDbPartition partition
|
||||
offsetContext = initializeOffsets(connectorConfig, partition, replicaSets);
|
||||
}
|
||||
|
||||
try {
|
||||
if (validReplicaSets.size() == 1) {
|
||||
// Streams the replica-set changes in the current thread
|
||||
streamChangesForReplicaSet(context, partition, validReplicaSets.get(0), offsetContext);
|
||||
}
|
||||
else if (validReplicaSets.size() > 1) {
|
||||
// Starts a thread for each replica-set and executes the streaming process
|
||||
streamChangesForReplicaSets(context, partition, validReplicaSets, offsetContext);
|
||||
}
|
||||
if (validReplicaSets.size() == 1) {
|
||||
// Streams the replica-set changes in the current thread
|
||||
streamChangesForReplicaSet(context, partition, validReplicaSets.get(0), offsetContext);
|
||||
}
|
||||
finally {
|
||||
taskContext.getConnectionContext().close();
|
||||
else if (validReplicaSets.size() > 1) {
|
||||
// Starts a thread for each replica-set and executes the streaming process
|
||||
streamChangesForReplicaSets(context, partition, validReplicaSets, offsetContext);
|
||||
}
|
||||
}
|
||||
|
||||
private void streamChangesForReplicaSet(ChangeEventSourceContext context, MongoDbPartition partition,
|
||||
ReplicaSet replicaSet, MongoDbOffsetContext offsetContext) {
|
||||
RetryingMongoClient mongo = null;
|
||||
try {
|
||||
mongo = establishConnection(partition, replicaSet, ReadPreference.secondaryPreferred());
|
||||
if (mongo != null) {
|
||||
final AtomicReference<RetryingMongoClient> mongoReference = new AtomicReference<>(mongo);
|
||||
mongo.execute("read from change stream on '" + replicaSet + "'", client -> {
|
||||
readChangeStream(client, mongoReference.get(), replicaSet, context, offsetContext);
|
||||
});
|
||||
}
|
||||
try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.secondaryPreferred())) {
|
||||
final AtomicReference<MongoDbConnection> mongoReference = new AtomicReference<>(mongo);
|
||||
mongo.execute("read from change stream on '" + replicaSet + "'", client -> {
|
||||
readChangeStream(client, mongoReference.get(), replicaSet, context, offsetContext);
|
||||
});
|
||||
}
|
||||
catch (Throwable t) {
|
||||
LOGGER.error("Streaming for replica set {} failed", replicaSet.replicaSetName(), t);
|
||||
errorHandler.setProducerThrowable(t);
|
||||
}
|
||||
finally {
|
||||
if (mongo != null) {
|
||||
mongo.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void streamChangesForReplicaSets(ChangeEventSourceContext context, MongoDbPartition partition,
|
||||
@ -153,7 +139,7 @@ private void streamChangesForReplicaSets(ChangeEventSourceContext context, Mongo
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
private RetryingMongoClient establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
|
||||
private MongoDbConnection establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
|
||||
return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> {
|
||||
// propagate authorization failures
|
||||
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
|
||||
@ -167,7 +153,7 @@ private RetryingMongoClient establishConnection(MongoDbPartition partition, Repl
|
||||
});
|
||||
}
|
||||
|
||||
private void readChangeStream(MongoClient client, RetryingMongoClient mongo, ReplicaSet replicaSet, ChangeEventSourceContext context,
|
||||
private void readChangeStream(MongoClient client, MongoDbConnection mongo, ReplicaSet replicaSet, ChangeEventSourceContext context,
|
||||
MongoDbOffsetContext offsetContext) {
|
||||
final ReplicaSetPartition rsPartition = offsetContext.getReplicaSetPartition(replicaSet);
|
||||
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
|
||||
@ -271,17 +257,11 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
|
||||
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
|
||||
replicaSets.onEachReplicaSet(replicaSet -> {
|
||||
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
|
||||
RetryingMongoClient mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred());
|
||||
if (mongo != null) {
|
||||
try {
|
||||
mongo.execute("get oplog position", client -> {
|
||||
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
|
||||
});
|
||||
}
|
||||
finally {
|
||||
LOGGER.info("Stopping primary client");
|
||||
mongo.stop();
|
||||
}
|
||||
|
||||
try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred())) {
|
||||
mongo.execute("get oplog position", client -> {
|
||||
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@ -295,11 +275,11 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
|
||||
private static class ReplicaSetChangeStreamsContext {
|
||||
private final ReplicaSetPartition partition;
|
||||
private final ReplicaSetOffsetContext offset;
|
||||
private final RetryingMongoClient mongo;
|
||||
private final MongoDbConnection mongo;
|
||||
private final ReplicaSet replicaSet;
|
||||
|
||||
ReplicaSetChangeStreamsContext(ReplicaSetPartition partition, ReplicaSetOffsetContext offsetContext,
|
||||
RetryingMongoClient mongo, ReplicaSet replicaSet) {
|
||||
MongoDbConnection mongo, ReplicaSet replicaSet) {
|
||||
this.partition = partition;
|
||||
this.offset = offsetContext;
|
||||
this.mongo = mongo;
|
||||
@ -314,7 +294,7 @@ ReplicaSetOffsetContext getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
RetryingMongoClient getMongo() {
|
||||
MongoDbConnection getMongo() {
|
||||
return mongo;
|
||||
}
|
||||
|
||||
|
@ -8,10 +8,10 @@
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoException;
|
||||
import com.mongodb.MongoInterruptedException;
|
||||
import com.mongodb.client.MongoClient;
|
||||
@ -20,6 +20,7 @@
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionContext;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import io.debezium.connector.mongodb.connection.ReplicaSet;
|
||||
|
||||
/**
|
||||
@ -34,6 +35,7 @@ public class ReplicaSetDiscovery {
|
||||
* The database that might be used to check for replica set information in a sharded cluster.
|
||||
*/
|
||||
public static final String CONFIG_DATABASE_NAME = "config";
|
||||
public static final String SHARDS_COLLECTION_NAME = "shards";
|
||||
|
||||
/**
|
||||
* The database that might be used to check for member information in a replica set.
|
||||
@ -61,74 +63,88 @@ public ReplicaSetDiscovery(MongoDbTaskContext context) {
|
||||
*
|
||||
* @return the information about the replica sets; never null but possibly empty
|
||||
*/
|
||||
public ReplicaSets getReplicaSets() {
|
||||
public ReplicaSets getReplicaSets(MongoClient client) {
|
||||
ConnectionContext connectionContext = context.getConnectionContext();
|
||||
MongoClient client = connectionContext.connect();
|
||||
Set<ReplicaSet> replicaSetSpecs = new HashSet<>();
|
||||
|
||||
final ClusterDescription clusterDescription = MongoUtil.clusterDescription(client);
|
||||
|
||||
if (clusterDescription.getType() == ClusterType.SHARDED) {
|
||||
LOGGER.info("Cluster at {} identified as sharded cluster", maskedConnectionSeed);
|
||||
|
||||
// Gather connection details to each shard ...
|
||||
String shardsCollection = "shards";
|
||||
try {
|
||||
MongoUtil.onCollectionDocuments(client, CONFIG_DATABASE_NAME, shardsCollection, doc -> {
|
||||
String shardName = doc.getString("_id");
|
||||
String hostStr = doc.getString("host");
|
||||
|
||||
LOGGER.info("Reading shard details for {}", shardName);
|
||||
|
||||
ConnectionStrings.parseFromHosts(hostStr).ifPresentOrElse(
|
||||
cs -> replicaSetSpecs.add(new ReplicaSet(cs)),
|
||||
() -> LOGGER.info("Shard {} is not a valid replica set", shardName));
|
||||
});
|
||||
}
|
||||
catch (MongoInterruptedException e) {
|
||||
LOGGER.error("Interrupted while reading the '{}' collection in the '{}' database: {}",
|
||||
shardsCollection, CONFIG_DATABASE_NAME, e.getMessage(), e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
catch (MongoException e) {
|
||||
LOGGER.error("Error while reading the '{}' collection in the '{}' database: {}",
|
||||
shardsCollection, CONFIG_DATABASE_NAME, e.getMessage(), e);
|
||||
}
|
||||
readReplicaSetsFromShardedCluster(replicaSetSpecs, client);
|
||||
}
|
||||
|
||||
if (clusterDescription.getType() == ClusterType.REPLICA_SET) {
|
||||
LOGGER.info("Cluster at '{}' identified as replicaSet", maskedConnectionSeed);
|
||||
|
||||
var connectionString = connectionContext.connectionSeed();
|
||||
var cs = connectionContext.connectionString();
|
||||
|
||||
if (cs.getRequiredReplicaSetName() == null) {
|
||||
// Java driver is smart enough to connect correctly
|
||||
// However replicaSet parameter is mandatory, and we need the name for offset storage
|
||||
LOGGER.warn("Replica set not specified in '{}'", maskedConnectionSeed);
|
||||
LOGGER.warn("Parameter 'replicaSet' should be added to connection string");
|
||||
LOGGER.warn("Trying to determine replica set name for '{}'", maskedConnectionSeed);
|
||||
var rsName = MongoUtil.replicaSetName(clusterDescription);
|
||||
|
||||
if (rsName.isPresent()) {
|
||||
LOGGER.info("Found '{}' replica set for '{}'", rsName.get(), maskedConnectionSeed);
|
||||
connectionString = ConnectionStrings.appendParameter(connectionString, "replicaSet", rsName.get());
|
||||
}
|
||||
else {
|
||||
LOGGER.warn("Unable to find replica set name for '{}'", maskedConnectionSeed);
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.info("Using '{}' as replica set connection string", ConnectionStrings.mask(connectionString));
|
||||
replicaSetSpecs.add(new ReplicaSet(connectionString));
|
||||
readReplicaSetsFromCluster(replicaSetSpecs, clusterDescription, connectionContext);
|
||||
}
|
||||
|
||||
if (replicaSetSpecs.isEmpty()) {
|
||||
// Without a replica sets, we can't do anything ...
|
||||
LOGGER.error(
|
||||
"Found no replica sets at {}, so there is nothing to monitor and no connector tasks will be started. Check seed addresses in connector configuration.",
|
||||
maskedConnectionSeed);
|
||||
LOGGER.error("Found no replica sets at {}, so there is nothing to monitor and no connector tasks will be started.", maskedConnectionSeed);
|
||||
}
|
||||
|
||||
return new ReplicaSets(replicaSetSpecs);
|
||||
}
|
||||
|
||||
private void readReplicaSetsFromCluster(Set<ReplicaSet> replicaSetSpecs, ClusterDescription clusterDescription, ConnectionContext connectionContext) {
|
||||
var connectionString = ensureReplicaSetName(connectionContext.connectionSeed(), clusterDescription);
|
||||
|
||||
LOGGER.info("Using '{}' as replica set connection string", ConnectionStrings.mask(connectionString));
|
||||
replicaSetSpecs.add(new ReplicaSet(connectionString));
|
||||
}
|
||||
|
||||
private void readReplicaSetsFromShardedCluster(Set<ReplicaSet> replicaSetSpecs, MongoClient client) {
|
||||
try {
|
||||
MongoUtil.onCollectionDocuments(client, CONFIG_DATABASE_NAME, SHARDS_COLLECTION_NAME, doc -> {
|
||||
String shardName = doc.getString("_id");
|
||||
String hostStr = doc.getString("host");
|
||||
|
||||
LOGGER.info("Reading shard details for {}", shardName);
|
||||
|
||||
ConnectionStrings.parseFromHosts(hostStr).ifPresentOrElse(
|
||||
cs -> replicaSetSpecs.add(new ReplicaSet(cs)),
|
||||
() -> LOGGER.info("Shard {} is not a valid replica set", shardName));
|
||||
});
|
||||
}
|
||||
catch (MongoInterruptedException e) {
|
||||
LOGGER.error("Interrupted while reading the '{}' collection in the '{}' database: {}",
|
||||
SHARDS_COLLECTION_NAME, CONFIG_DATABASE_NAME, e.getMessage(), e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
catch (MongoException e) {
|
||||
LOGGER.error("Error while reading the '{}' collection in the '{}' database: {}",
|
||||
SHARDS_COLLECTION_NAME, CONFIG_DATABASE_NAME, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures connection string contains the replicaSet parameter.If connection string doesn't contain the parameter,
|
||||
* the replica set name is read from cluster description and the parameter is added.
|
||||
*
|
||||
* @param connectionString the original connection string
|
||||
* @param clusterDescription cluster description
|
||||
* @return connection string with replicaSet parameter
|
||||
*/
|
||||
private String ensureReplicaSetName(String connectionString, ClusterDescription clusterDescription) {
|
||||
// If we have replicaSet parameter then just return
|
||||
var cs = new ConnectionString(connectionString);
|
||||
if (cs.getRequiredReplicaSetName() != null) {
|
||||
return connectionString;
|
||||
}
|
||||
|
||||
// Otherwise Java driver is smart enough to connect correctly
|
||||
// However replicaSet parameter is mandatory, and we need the name for offset storage
|
||||
LOGGER.warn("Replica set not specified in '{}'", maskedConnectionSeed);
|
||||
LOGGER.warn("Parameter 'replicaSet' should be added to connection string");
|
||||
LOGGER.warn("Trying to determine replica set name for '{}'", maskedConnectionSeed);
|
||||
var rsName = MongoUtil.replicaSetName(clusterDescription);
|
||||
|
||||
if (rsName.isPresent()) {
|
||||
LOGGER.info("Found '{}' replica set for '{}'", rsName.get(), maskedConnectionSeed);
|
||||
return ConnectionStrings.appendParameter(connectionString, "replicaSet", rsName.get());
|
||||
}
|
||||
|
||||
LOGGER.warn("Unable to find replica set name for '{}'", maskedConnectionSeed);
|
||||
return connectionString;
|
||||
}
|
||||
}
|
||||
|
@ -9,13 +9,15 @@
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.mongodb.MongoInterruptedException;
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
import io.debezium.connector.mongodb.connection.ConnectionContext;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.Metronome;
|
||||
|
||||
@ -30,7 +32,8 @@ public final class ReplicaSetMonitorThread implements Runnable {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private final Metronome metronome;
|
||||
private final CountDownLatch initialized = new CountDownLatch(1);
|
||||
private final Supplier<ReplicaSets> monitor;
|
||||
private final ConnectionContext connectionContext;
|
||||
private final Function<MongoClient, ReplicaSets> monitor;
|
||||
private final Consumer<ReplicaSets> onChange;
|
||||
private final Runnable onStartup;
|
||||
private volatile ReplicaSets replicaSets = ReplicaSets.empty();
|
||||
@ -42,11 +45,12 @@ public final class ReplicaSetMonitorThread implements Runnable {
|
||||
* @param onStartup the function to call when the thread is started; may be null if not needed
|
||||
* @param onChange the function to call when the set of replica set specifications has changed; may be null if not needed
|
||||
*/
|
||||
public ReplicaSetMonitorThread(Supplier<ReplicaSets> monitor, Duration period, Clock clock, Runnable onStartup,
|
||||
public ReplicaSetMonitorThread(ConnectionContext connectionContext, Function<MongoClient, ReplicaSets> monitor, Duration period, Clock clock, Runnable onStartup,
|
||||
Consumer<ReplicaSets> onChange) {
|
||||
if (clock == null) {
|
||||
clock = Clock.system();
|
||||
}
|
||||
this.connectionContext = connectionContext;
|
||||
this.monitor = monitor;
|
||||
this.metronome = Metronome.sleeper(period, clock);
|
||||
this.onChange = onChange != null ? onChange : (rsSpecs) -> {
|
||||
@ -62,9 +66,9 @@ public void run() {
|
||||
}
|
||||
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
try (var client = connectionContext.connect()) {
|
||||
ReplicaSets previousReplicaSets = replicaSets;
|
||||
replicaSets = monitor.get();
|
||||
replicaSets = monitor.apply(client);
|
||||
initialized.countDown();
|
||||
// Determine if any replica set specifications have changed ...
|
||||
if (replicaSets.haveChangedSince(previousReplicaSets)) {
|
||||
|
@ -7,7 +7,6 @@
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -26,12 +25,12 @@
|
||||
* @author Randall Hauch
|
||||
*
|
||||
*/
|
||||
public class ConnectionContext implements AutoCloseable {
|
||||
public class ConnectionContext {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionContext.class);
|
||||
|
||||
protected final Configuration config;
|
||||
protected final MongoClients pool;
|
||||
private final Configuration config;
|
||||
private final MongoDbClientFactory clientFactory;
|
||||
|
||||
/**
|
||||
* @param config the configuration
|
||||
@ -39,8 +38,6 @@ public class ConnectionContext implements AutoCloseable {
|
||||
public ConnectionContext(Configuration config) {
|
||||
this.config = config;
|
||||
|
||||
final ConnectionString connectionString = connectionString();
|
||||
|
||||
final String username = config.getString(MongoDbConnectorConfig.USER);
|
||||
final String password = config.getString(MongoDbConnectorConfig.PASSWORD);
|
||||
final String adminDbName = config.getString(MongoDbConnectorConfig.AUTH_SOURCE);
|
||||
@ -53,42 +50,34 @@ public ConnectionContext(Configuration config) {
|
||||
final int serverSelectionTimeoutMs = config.getInteger(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS);
|
||||
|
||||
// Set up the client pool so that it ...
|
||||
MongoClients.Builder poolBuilder = MongoClients.create();
|
||||
clientFactory = MongoDbClientFactory.create(settings -> {
|
||||
settings.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS))
|
||||
.applyToServerSettings(
|
||||
builder -> builder.heartbeatFrequency(heartbeatFrequencyMs, TimeUnit.MILLISECONDS));
|
||||
|
||||
poolBuilder.settings()
|
||||
.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS))
|
||||
.applyToServerSettings(
|
||||
builder -> builder.heartbeatFrequency(heartbeatFrequencyMs, TimeUnit.MILLISECONDS));
|
||||
// Use credential if provided as properties
|
||||
if (username != null || password != null) {
|
||||
settings.credential(MongoCredential.createCredential(username, adminDbName, password.toCharArray()));
|
||||
}
|
||||
if (useSSL) {
|
||||
settings.applyToSslSettings(
|
||||
builder -> builder.enabled(true).invalidHostNameAllowed(sslAllowInvalidHostnames));
|
||||
}
|
||||
|
||||
// Use credential if provided as properties
|
||||
if (username != null || password != null) {
|
||||
poolBuilder.withCredential(MongoCredential.createCredential(username, adminDbName, password.toCharArray()));
|
||||
}
|
||||
if (useSSL) {
|
||||
poolBuilder.settings().applyToSslSettings(
|
||||
builder -> builder.enabled(true).invalidHostNameAllowed(sslAllowInvalidHostnames));
|
||||
}
|
||||
|
||||
poolBuilder.settings()
|
||||
.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS));
|
||||
|
||||
pool = poolBuilder.build();
|
||||
settings.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS));
|
||||
});
|
||||
}
|
||||
|
||||
public MongoDbConnectorConfig getConnectorConfig() {
|
||||
return new MongoDbConnectorConfig(config);
|
||||
}
|
||||
|
||||
protected Logger logger() {
|
||||
return LOGGER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial connection string which is either a host specification or connection string
|
||||
*
|
||||
@ -121,7 +110,7 @@ public Duration pollInterval() {
|
||||
}
|
||||
|
||||
public MongoClient connect() {
|
||||
return pool.client(connectionString());
|
||||
return clientFactory.client(connectionString());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -130,23 +119,11 @@ public MongoClient connect() {
|
||||
* @param replicaSet the replica set information; may not be null
|
||||
* @param filters the filter configuration
|
||||
* @param errorHandler the function to be called whenever the node is unable to
|
||||
* {@link RetryingMongoClient#execute(String, BlockingConsumer)} execute} an operation to completion; may be null
|
||||
* {@link MongoDbConnection#execute(String, BlockingConsumer)} execute} an operation to completion; may be null
|
||||
* @return the client, or {@code null} if no primary could be found for the replica set
|
||||
*/
|
||||
public RetryingMongoClient connect(ReplicaSet replicaSet, ReadPreference preference, Filters filters,
|
||||
BiConsumer<String, Throwable> errorHandler) {
|
||||
return new RetryingMongoClient(replicaSet, preference, pool::client, filters, errorHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void close() {
|
||||
try {
|
||||
// Closing all connections ...
|
||||
logger().info("Closing all connections to {}", maskedConnectionSeed());
|
||||
pool.clear();
|
||||
}
|
||||
catch (Throwable e) {
|
||||
logger().error("Unexpected error shutting down the MongoDB clients", e);
|
||||
}
|
||||
public MongoDbConnection connect(ReplicaSet replicaSet, ReadPreference preference, Filters filters,
|
||||
MongoDbConnection.ErrorHandler errorHandler) {
|
||||
return new MongoDbConnection(replicaSet, preference, clientFactory, filters, errorHandler);
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,14 @@ public static String buildFromHosts(String hosts) {
|
||||
return parseFromHosts(hosts).orElseThrow(() -> new DebeziumException("Unable to build connection string"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends new parameter to connection string
|
||||
*
|
||||
* @param connectionString original connection string
|
||||
* @param name parameter name
|
||||
* @param value parameter value
|
||||
* @return new connection string with added parameter
|
||||
*/
|
||||
public static String appendParameter(String connectionString, String name, String value) {
|
||||
var param = name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
|
||||
|
||||
@ -57,6 +65,12 @@ public static String appendParameter(String connectionString, String name, Strin
|
||||
return connectionString + "&" + param;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mask credential information in connection string
|
||||
*
|
||||
* @param connectionString original connection string
|
||||
* @return connection string with masked credential information
|
||||
*/
|
||||
public static String mask(String connectionString) {
|
||||
var cs = new ConnectionString(connectionString);
|
||||
var credentials = cs.getCredential();
|
||||
|
@ -1,116 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mongodb.connection;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.MongoCredential;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
|
||||
/**
|
||||
* A connection pool of MongoClient instances. This pool supports creating clients that communicate explicitly with a single
|
||||
* server, or clients that communicate with any members of a replica set or sharded cluster given a set of seed addresses.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@ThreadSafe
|
||||
public class MongoClients {
|
||||
|
||||
/**
|
||||
* Obtain a builder that can be used to configure and {@link Builder#build() create} a connection pool.
|
||||
*
|
||||
* @return the new builder; never null
|
||||
*/
|
||||
public static Builder create() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures and builds a ConnectionPool.
|
||||
*/
|
||||
public static class Builder {
|
||||
private final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
|
||||
|
||||
/**
|
||||
* Add the given {@link MongoCredential} for use when creating clients.
|
||||
*
|
||||
* @param credential the credential; may be {@code null}, though this method does nothing if {@code null}
|
||||
* @return this builder object so methods can be chained; never null
|
||||
*/
|
||||
public Builder withCredential(MongoCredential credential) {
|
||||
if (credential != null) {
|
||||
settingsBuilder.credential(credential);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the options builder for client connections.
|
||||
*
|
||||
* @return the option builder; never null
|
||||
*/
|
||||
public MongoClientSettings.Builder settings() {
|
||||
return settingsBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the client pool that will use the credentials and options already configured on this builder.
|
||||
*
|
||||
* @return the new client pool; never null
|
||||
*/
|
||||
public MongoClients build() {
|
||||
return new MongoClients(settingsBuilder);
|
||||
}
|
||||
}
|
||||
|
||||
private final Map<MongoClientSettings, MongoClient> connections = new ConcurrentHashMap<>();
|
||||
|
||||
private final MongoClientSettings defaultSettings;
|
||||
|
||||
private MongoClients(MongoClientSettings.Builder settings) {
|
||||
this.defaultSettings = settings.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings}
|
||||
* @return connection settings builder
|
||||
*/
|
||||
protected MongoClientSettings.Builder settings() {
|
||||
return MongoClientSettings.builder(defaultSettings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear out and close any open connections.
|
||||
*/
|
||||
public void clear() {
|
||||
connections.values().forEach(MongoClient::close);
|
||||
connections.clear();
|
||||
}
|
||||
|
||||
public MongoClient client(ConnectionString connectionString) {
|
||||
return client(settings -> settings.applyConnectionString(connectionString));
|
||||
}
|
||||
|
||||
public MongoClient client(ReplicaSet replicaSet, ReadPreference preference) {
|
||||
return client(settings -> settings
|
||||
.applyConnectionString(replicaSet.connectionString())
|
||||
.readPreference(preference));
|
||||
}
|
||||
|
||||
protected MongoClient client(Consumer<MongoClientSettings.Builder> configurator) {
|
||||
MongoClientSettings.Builder settings = settings();
|
||||
configurator.accept(settings);
|
||||
|
||||
return connections.computeIfAbsent(settings.build(), com.mongodb.client.MongoClients::create);
|
||||
}
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mongodb.connection;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
|
||||
/**
|
||||
* A connection pool of MongoClient instances.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@ThreadSafe
|
||||
public final class MongoDbClientFactory {
|
||||
|
||||
private final MongoClientSettings defaultSettings;
|
||||
|
||||
/**
|
||||
* Obtains new client factory
|
||||
*
|
||||
* @param configurator settings instance use as template for all clients
|
||||
* @return mongo client factory
|
||||
*/
|
||||
public static MongoDbClientFactory create(Consumer<MongoClientSettings.Builder> configurator) {
|
||||
var settings = MongoClientSettings.builder();
|
||||
configurator.accept(settings);
|
||||
return new MongoDbClientFactory(settings);
|
||||
}
|
||||
|
||||
private MongoDbClientFactory(MongoClientSettings.Builder settings) {
|
||||
this.defaultSettings = settings.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings}
|
||||
* @return connection settings builder
|
||||
*/
|
||||
private MongoClientSettings.Builder settings() {
|
||||
return MongoClientSettings.builder(defaultSettings);
|
||||
}
|
||||
|
||||
public MongoClient client(ConnectionString connectionString) {
|
||||
return client(settings -> settings.applyConnectionString(connectionString));
|
||||
}
|
||||
|
||||
public MongoClient client(ReplicaSet replicaSet, ReadPreference preference) {
|
||||
return client(settings -> settings
|
||||
.applyConnectionString(replicaSet.connectionString())
|
||||
.readPreference(preference));
|
||||
}
|
||||
|
||||
private MongoClient client(Consumer<MongoClientSettings.Builder> configurator) {
|
||||
MongoClientSettings.Builder settings = settings();
|
||||
configurator.accept(settings);
|
||||
|
||||
return com.mongodb.client.MongoClients.create(settings.build());
|
||||
}
|
||||
}
|
@ -11,12 +11,11 @@
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
@ -29,39 +28,62 @@
|
||||
import io.debezium.util.Metronome;
|
||||
|
||||
/**
|
||||
* Client scoped to specified replica set.
|
||||
* Internally this wrapper attempts to obtain regular {@link MongoClient} instance using given back-off strategy
|
||||
* Scoped Mongodb Connection which applies filter configuration and replica set specification when required
|
||||
* Internally this wrapper attempts to obtain regular {@link MongoClient} instance
|
||||
*/
|
||||
public class RetryingMongoClient {
|
||||
public final class MongoDbConnection implements AutoCloseable {
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ErrorHandler {
|
||||
/**
|
||||
*
|
||||
* @param desc the description of the operation, for logging purposes
|
||||
* @param error the error which triggered this call
|
||||
*/
|
||||
void onError(String desc, Throwable error);
|
||||
}
|
||||
|
||||
/**
|
||||
* A pause between failed MongoDB operations to prevent CPU throttling and DoS of
|
||||
* target MongoDB database.
|
||||
*/
|
||||
private static final Duration PAUSE_AFTER_ERROR = Duration.ofMillis(500);
|
||||
|
||||
public static ErrorHandler DEFAULT_ERROR_HANDLER = (String desc, Throwable error) -> {
|
||||
throw new DebeziumException("Error while attempting to " + desc, error);
|
||||
};
|
||||
|
||||
private final Filters filters;
|
||||
private final BiConsumer<String, Throwable> errorHandler;
|
||||
private final ErrorHandler errorHandler;
|
||||
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||
private final String replicaSetName;
|
||||
private final String name;
|
||||
private final Supplier<MongoClient> connectionSupplier;
|
||||
|
||||
protected RetryingMongoClient(ReplicaSet replicaSet,
|
||||
ReadPreference readPreference,
|
||||
BiFunction<ReplicaSet, ReadPreference, MongoClient> connectionSupplier,
|
||||
Filters filters,
|
||||
BiConsumer<String, Throwable> errorHandler) {
|
||||
this.replicaSetName = replicaSet.replicaSetName();
|
||||
this.connectionSupplier = () -> connectionSupplier.apply(replicaSet, readPreference);
|
||||
protected MongoDbConnection(ReplicaSet replicaSet,
|
||||
ReadPreference readPreference,
|
||||
MongoDbClientFactory clientFactory,
|
||||
Filters filters,
|
||||
ErrorHandler errorHandler) {
|
||||
this.name = replicaSet.replicaSetName();
|
||||
this.connectionSupplier = () -> clientFactory.client(replicaSet, readPreference);
|
||||
this.filters = filters;
|
||||
this.errorHandler = errorHandler;
|
||||
}
|
||||
|
||||
protected MongoDbConnection(ConnectionString connectionString,
|
||||
MongoDbClientFactory clientFactory,
|
||||
Filters filters) {
|
||||
this.name = ConnectionStrings.mask(connectionString.getConnectionString());
|
||||
this.connectionSupplier = () -> clientFactory.client(connectionString);
|
||||
this.filters = filters;
|
||||
this.errorHandler = DEFAULT_ERROR_HANDLER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
|
||||
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
|
||||
* Execute the supplied operation. Whenever the operation fails the error handler is called and the operation is repeated
|
||||
*
|
||||
* @param desc the description of the operation, for logging purposes
|
||||
* @param operation the operation to be performed on a node of preferred type.
|
||||
* @param operation the operation to be performed
|
||||
*/
|
||||
public void execute(String desc, Consumer<MongoClient> operation) {
|
||||
execute(desc, client -> {
|
||||
@ -71,24 +93,22 @@ public void execute(String desc, Consumer<MongoClient> operation) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
|
||||
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
|
||||
* Execute the supplied operation. Whenever the operation fails the error handler is called and the operation is repeated
|
||||
*
|
||||
* @param desc the description of the operation, for logging purposes
|
||||
* @param operation the operation to be performed on a node of preferred type
|
||||
* @param operation the operation to be performed
|
||||
* @return return value of the executed operation
|
||||
*/
|
||||
public <T> T execute(String desc, Function<MongoClient, T> operation) {
|
||||
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
|
||||
MongoClient client = connectionSupplier.get();
|
||||
while (true) {
|
||||
try {
|
||||
try (var client = connectionSupplier.get()) {
|
||||
return operation.apply(client);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
errorHandler.accept(desc, t);
|
||||
errorHandler.onError(desc, t);
|
||||
if (!isRunning()) {
|
||||
throw new DebeziumException("Operation failed and MongoDB connection '" + replicaSetName + "' termination requested", t);
|
||||
throw new DebeziumException("Operation failed and MongoDB connection '" + name + "' termination requested", t);
|
||||
}
|
||||
try {
|
||||
errorMetronome.pause();
|
||||
@ -120,9 +140,9 @@ public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable t) {
|
||||
errorHandler.accept(desc, t);
|
||||
errorHandler.onError(desc, t);
|
||||
if (!isRunning()) {
|
||||
throw new DebeziumException("Operation failed and MongoDB connection '" + replicaSetName + "' termination requested", t);
|
||||
throw new DebeziumException("Operation failed and MongoDB connection to '" + name + "' termination requested", t);
|
||||
}
|
||||
errorMetronome.pause();
|
||||
}
|
||||
@ -130,9 +150,7 @@ public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation
|
||||
}
|
||||
|
||||
/**
|
||||
* Use a node of preferred type to get the names of all the databases in the replica set, applying the current database
|
||||
* filter configuration. This method will block until a node of preferred type can be obtained to get the names of all
|
||||
* databases in the replica set.
|
||||
* Get the names of all the databases applying the current database filter configuration.
|
||||
*
|
||||
* @return the database names; never null but possibly empty
|
||||
*/
|
||||
@ -153,21 +171,18 @@ public Set<String> databaseNames() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Use a node of preferred type to get the identifiers of all the collections in the replica set, applying the current
|
||||
* collection filter configuration. This method will block until a primary can be obtained to get the
|
||||
* identifiers of all collections in the replica set.
|
||||
* Get the identifiers of all the collections, applying the current collection filter configuration.
|
||||
*
|
||||
* @return the collection identifiers; never null
|
||||
*/
|
||||
public List<CollectionId> collections() {
|
||||
// For each database, get the list of collections ...
|
||||
return execute("get collections in databases", client -> {
|
||||
List<CollectionId> collections = new ArrayList<>();
|
||||
Set<String> databaseNames = databaseNames();
|
||||
|
||||
for (String dbName : databaseNames) {
|
||||
MongoUtil.forEachCollectionNameInDatabase(client, dbName, collectionName -> {
|
||||
CollectionId collectionId = new CollectionId(replicaSetName, dbName, collectionName);
|
||||
CollectionId collectionId = new CollectionId(name, dbName, collectionName);
|
||||
|
||||
if (filters.collectionFilter().test(collectionId)) {
|
||||
collections.add(collectionId);
|
||||
@ -183,10 +198,8 @@ private boolean isRunning() {
|
||||
return running.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminates the execution loop of the current primary
|
||||
*/
|
||||
public void stop() {
|
||||
@Override
|
||||
public void close() {
|
||||
running.set(false);
|
||||
}
|
||||
}
|
@ -74,14 +74,7 @@ public void beforeEach() {
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
try {
|
||||
stopConnector();
|
||||
}
|
||||
finally {
|
||||
if (context != null) {
|
||||
context.getConnectionContext().close();
|
||||
}
|
||||
}
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
@ -9,7 +9,6 @@
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -17,8 +16,8 @@
|
||||
import com.mongodb.ReadPreference;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.connector.mongodb.connection.ReplicaSet;
|
||||
import io.debezium.connector.mongodb.connection.RetryingMongoClient;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
public abstract class AbstractMongoIT extends AbstractBaseMongoIT {
|
||||
@ -28,7 +27,7 @@ public abstract class AbstractMongoIT extends AbstractBaseMongoIT {
|
||||
protected Configuration config;
|
||||
protected MongoDbTaskContext context;
|
||||
protected ReplicaSet replicaSet;
|
||||
protected RetryingMongoClient primary;
|
||||
protected MongoDbConnection primary;
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
@ -88,12 +87,4 @@ private void initialize(boolean restartFromBeginning) {
|
||||
primary = context.getConnectionContext().connect(
|
||||
replicaSet, ReadPreference.primary(), context.filters(), TestHelper.connectionErrorHandler(3));
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
if (context != null) {
|
||||
// close all connections
|
||||
context.getConnectionContext().close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -52,8 +52,8 @@ public void shouldUseSSL() throws InterruptedException, IOException {
|
||||
.with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS, 2000)
|
||||
.build());
|
||||
|
||||
primary.executeBlocking("Try SSL connection", mongo -> {
|
||||
primary.stop();
|
||||
primary.execute("Try SSL connection", mongo -> {
|
||||
primary.close();
|
||||
mongo.getDatabase("dbit").listCollectionNames().first();
|
||||
});
|
||||
}
|
||||
|
@ -7,7 +7,6 @@
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
@ -21,13 +20,6 @@ public class MongoDbSchemaIT {
|
||||
private Configuration config;
|
||||
private MongoDbTaskContext taskContext;
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
if (taskContext != null) {
|
||||
taskContext.getConnectionContext().close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldAlwaysProduceCollectionSchema() {
|
||||
config = TestHelper.getConfiguration();
|
||||
|
@ -79,7 +79,7 @@ public void shouldGetFirstValidReplicaSetName() {
|
||||
|
||||
when(mongoClient.getClusterDescription()).thenReturn(clusterDescription);
|
||||
|
||||
ReplicaSets replicaSets = replicaSetDiscovery.getReplicaSets();
|
||||
ReplicaSets replicaSets = replicaSetDiscovery.getReplicaSets(mongoClient);
|
||||
assertThat(replicaSets.all().size()).isEqualTo(1);
|
||||
assertThat(replicaSets.all().get(0).replicaSetName()).isEqualTo("my_rs");
|
||||
}
|
||||
|
@ -10,11 +10,11 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import io.debezium.connector.mongodb.connection.ReplicaSet;
|
||||
|
||||
/**
|
||||
|
@ -11,7 +11,6 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -30,6 +29,7 @@
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Configuration.Builder;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.testing.testcontainers.MongoDbReplicaSet;
|
||||
|
||||
/**
|
||||
@ -70,7 +70,7 @@ public static Configuration getConfiguration(String connectionString) {
|
||||
return cfgBuilder.build();
|
||||
}
|
||||
|
||||
public static BiConsumer<String, Throwable> connectionErrorHandler(int numErrorsBeforeFailing) {
|
||||
public static MongoDbConnection.ErrorHandler connectionErrorHandler(int numErrorsBeforeFailing) {
|
||||
AtomicInteger attempts = new AtomicInteger();
|
||||
return (desc, error) -> {
|
||||
if (attempts.incrementAndGet() > numErrorsBeforeFailing) {
|
||||
|
@ -85,14 +85,7 @@ public void beforeEach(Configuration config) {
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
try {
|
||||
stopConnector();
|
||||
}
|
||||
finally {
|
||||
if (context != null) {
|
||||
context.getConnectionContext().close();
|
||||
}
|
||||
}
|
||||
stopConnector();
|
||||
outboxEventRouter.close();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user