From 7d3cca89bd6475d413ea66aa7e7d8ed6c73c607a Mon Sep 17 00:00:00 2001 From: jcechace Date: Wed, 1 Feb 2023 23:52:23 +0100 Subject: [PATCH] DBZ-6032 MongoConnection allways rethrows InterruptedException --- ...bIncrementalSnapshotChangeEventSource.java | 9 ++-- .../MongoDbSnapshotChangeEventSource.java | 11 +++-- .../MongoDbStreamingChangeEventSource.java | 7 ++-- .../mongodb/connection/MongoDbConnection.java | 42 +++---------------- .../connector/mongodb/ConnectionIT.java | 2 +- .../debezium/function/BlockingFunction.java | 29 +++++++++++++ 6 files changed, 49 insertions(+), 51 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/function/BlockingFunction.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index ff52e8416..66786bb05 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -179,7 +179,8 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key) protected void emitWindowOpen() throws InterruptedException { final CollectionId collectionId = signallingCollectionId; final String id = context.currentChunkId() + "-open"; - mongo.executeBlocking( + + mongo.execute( "emit window open for chunk '" + context.currentChunkId() + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); @@ -200,7 +201,7 @@ protected void emitWindowOpen() throws InterruptedException { protected void emitWindowClose() throws InterruptedException { final CollectionId collectionId = signallingCollectionId; final String id = context.currentChunkId() + "-close"; - mongo.executeBlocking( + mongo.execute( "emit window close for chunk '" + context.currentChunkId() + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); @@ -322,7 +323,7 @@ private void nextDataCollection(MongoDbPartition partition) { private Object[] readMaximumKey() throws InterruptedException { final CollectionId collectionId = (CollectionId) currentCollection.id(); final AtomicReference key = new AtomicReference<>(); - mongo.executeBlocking("maximum key for '" + collectionId + "'", client -> { + mongo.execute("maximum key for '" + collectionId + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name()); @@ -417,7 +418,7 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw long exportStart = clock.currentTimeInMillis(); LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount()); - mongo.executeBlocking("chunk query key for '" + currentCollection.id() + "'", client -> { + mongo.execute("chunk query key for '" + currentCollection.id() + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name(), BsonDocument.class); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 1a6d012ba..2e9b69cda 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -192,8 +192,7 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo } @Override - protected SnapshotContext prepare(MongoDbPartition partition) - throws Exception { + protected SnapshotContext prepare(MongoDbPartition partition) { return new MongoDbSnapshotContext(partition); } @@ -264,9 +263,9 @@ else if (lastRecordedTs.compareTo(firstAvailableTs) < 0) { return performSnapshot; } - protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets replicaSets) { + protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets replicaSets) throws InterruptedException { final Map positions = new LinkedHashMap<>(); - replicaSets.onEachReplicaSet(replicaSet -> { + for (var replicaSet : replicaSets.all()) { LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName()); try (MongoDbConnection mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred())) { @@ -274,7 +273,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER)); }); } - }); + } ctx.offset = new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext<>(false), positions); @@ -411,7 +410,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex long exportStart = clock.currentTimeInMillis(); LOGGER.info("\t Exporting data for collection '{}'", collectionId); - mongo.executeBlocking("sync '" + collectionId + "'", client -> { + mongo.execute("sync '" + collectionId + "'", client -> { final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name(), BsonDocument.class); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index f42635a1d..2e47fc886 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -253,9 +253,10 @@ else if (oplogStart.getTime() > 0) { } protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connectorConfig, MongoDbPartition partition, - ReplicaSets replicaSets) { + ReplicaSets replicaSets) + throws InterruptedException { final Map positions = new LinkedHashMap<>(); - replicaSets.onEachReplicaSet(replicaSet -> { + for (var replicaSet : replicaSets.all()) { LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName()); try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred())) { @@ -263,7 +264,7 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER)); }); } - }); + } return new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext<>(false), positions); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java index 8789d622b..72b7fa049 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnection.java @@ -11,8 +11,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import com.mongodb.ConnectionString; @@ -24,6 +22,7 @@ import io.debezium.connector.mongodb.Filters; import io.debezium.connector.mongodb.MongoUtil; import io.debezium.function.BlockingConsumer; +import io.debezium.function.BlockingFunction; import io.debezium.util.Clock; import io.debezium.util.Metronome; @@ -85,7 +84,7 @@ protected MongoDbConnection(ConnectionString connectionString, * @param desc the description of the operation, for logging purposes * @param operation the operation to be performed */ - public void execute(String desc, Consumer operation) { + public void execute(String desc, BlockingConsumer operation) throws InterruptedException { execute(desc, client -> { operation.accept(client); return null; @@ -99,43 +98,12 @@ public void execute(String desc, Consumer operation) { * @param operation the operation to be performed * @return return value of the executed operation */ - public T execute(String desc, Function operation) { + public T execute(String desc, BlockingFunction operation) throws InterruptedException { final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); while (true) { try (var client = connectionSupplier.get()) { return operation.apply(client); } - catch (Throwable t) { - errorHandler.onError(desc, t); - if (!isRunning()) { - throw new DebeziumException("Operation failed and MongoDB connection '" + name + "' termination requested", t); - } - try { - errorMetronome.pause(); - } - catch (InterruptedException e) { - // Interruption is not propagated - } - } - } - } - - /** - * Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops - * (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type. - * - * @param desc the description of the operation, for logging purposes - * @param operation the operation to be performed on a node of preferred type. - * @throws InterruptedException if the operation was interrupted - */ - public void executeBlocking(String desc, BlockingConsumer operation) throws InterruptedException { - final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); - while (true) { - MongoClient client = connectionSupplier.get(); - try { - operation.accept(client); - return; - } catch (InterruptedException e) { throw e; } @@ -154,7 +122,7 @@ public void executeBlocking(String desc, BlockingConsumer operation * * @return the database names; never null but possibly empty */ - public Set databaseNames() { + public Set databaseNames() throws InterruptedException { return execute("get database names", client -> { Set databaseNames = new HashSet<>(); @@ -175,7 +143,7 @@ public Set databaseNames() { * * @return the collection identifiers; never null */ - public List collections() { + public List collections() throws InterruptedException { return execute("get collections in databases", client -> { List collections = new ArrayList<>(); Set databaseNames = databaseNames(); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java index f6a4b2c49..8f71fa265 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java @@ -58,7 +58,7 @@ public void shouldUseSSL() throws InterruptedException, IOException { } @Test - public void shouldCreateMovieDatabase() { + public void shouldCreateMovieDatabase() throws InterruptedException { useConfiguration(config.edit() .with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, "dbA,dbB") .with(MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST, "dbB.moviesB") diff --git a/debezium-core/src/main/java/io/debezium/function/BlockingFunction.java b/debezium-core/src/main/java/io/debezium/function/BlockingFunction.java new file mode 100644 index 000000000..243add9ac --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/function/BlockingFunction.java @@ -0,0 +1,29 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.function; + +import java.util.Objects; +import java.util.function.Function; + +/** + * A variant of {@link Function} that can be blocked and interrupted. + * @param the type of the input to the function + * @param the type of the result of the function* @author jcechace + */ +@FunctionalInterface +public interface BlockingFunction { + + R apply(T t) throws InterruptedException; + + default BlockingFunction andThen(BlockingFunction after) { + Objects.requireNonNull(after); + return (T t) -> after.apply(apply(t)); + } + + static BlockingFunction identity() { + return t -> t; + } +}