DBZ-6032 MongoConnection allways rethrows InterruptedException
This commit is contained in:
parent
67d3f5b373
commit
7d3cca89bd
@ -179,7 +179,8 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key)
|
||||
protected void emitWindowOpen() throws InterruptedException {
|
||||
final CollectionId collectionId = signallingCollectionId;
|
||||
final String id = context.currentChunkId() + "-open";
|
||||
mongo.executeBlocking(
|
||||
|
||||
mongo.execute(
|
||||
"emit window open for chunk '" + context.currentChunkId() + "'",
|
||||
client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
@ -200,7 +201,7 @@ protected void emitWindowOpen() throws InterruptedException {
|
||||
protected void emitWindowClose() throws InterruptedException {
|
||||
final CollectionId collectionId = signallingCollectionId;
|
||||
final String id = context.currentChunkId() + "-close";
|
||||
mongo.executeBlocking(
|
||||
mongo.execute(
|
||||
"emit window close for chunk '" + context.currentChunkId() + "'",
|
||||
client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
@ -322,7 +323,7 @@ private void nextDataCollection(MongoDbPartition partition) {
|
||||
private Object[] readMaximumKey() throws InterruptedException {
|
||||
final CollectionId collectionId = (CollectionId) currentCollection.id();
|
||||
final AtomicReference<Object> key = new AtomicReference<>();
|
||||
mongo.executeBlocking("maximum key for '" + collectionId + "'", client -> {
|
||||
mongo.execute("maximum key for '" + collectionId + "'", client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
|
||||
|
||||
@ -417,7 +418,7 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw
|
||||
long exportStart = clock.currentTimeInMillis();
|
||||
LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount());
|
||||
|
||||
mongo.executeBlocking("chunk query key for '" + currentCollection.id() + "'", client -> {
|
||||
mongo.execute("chunk query key for '" + currentCollection.id() + "'", client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
|
||||
|
||||
|
@ -192,8 +192,7 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoDbPartition partition)
|
||||
throws Exception {
|
||||
protected SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoDbPartition partition) {
|
||||
return new MongoDbSnapshotContext(partition);
|
||||
}
|
||||
|
||||
@ -264,9 +263,9 @@ else if (lastRecordedTs.compareTo(firstAvailableTs) < 0) {
|
||||
return performSnapshot;
|
||||
}
|
||||
|
||||
protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets replicaSets) {
|
||||
protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets replicaSets) throws InterruptedException {
|
||||
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
|
||||
replicaSets.onEachReplicaSet(replicaSet -> {
|
||||
for (var replicaSet : replicaSets.all()) {
|
||||
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
|
||||
|
||||
try (MongoDbConnection mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred())) {
|
||||
@ -274,7 +273,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
|
||||
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ctx.offset = new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(),
|
||||
new MongoDbIncrementalSnapshotContext<>(false), positions);
|
||||
@ -411,7 +410,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex
|
||||
long exportStart = clock.currentTimeInMillis();
|
||||
LOGGER.info("\t Exporting data for collection '{}'", collectionId);
|
||||
|
||||
mongo.executeBlocking("sync '" + collectionId + "'", client -> {
|
||||
mongo.execute("sync '" + collectionId + "'", client -> {
|
||||
final MongoDatabase database = client.getDatabase(collectionId.dbName());
|
||||
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
|
||||
|
||||
|
@ -253,9 +253,10 @@ else if (oplogStart.getTime() > 0) {
|
||||
}
|
||||
|
||||
protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connectorConfig, MongoDbPartition partition,
|
||||
ReplicaSets replicaSets) {
|
||||
ReplicaSets replicaSets)
|
||||
throws InterruptedException {
|
||||
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
|
||||
replicaSets.onEachReplicaSet(replicaSet -> {
|
||||
for (var replicaSet : replicaSets.all()) {
|
||||
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
|
||||
|
||||
try (MongoDbConnection mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred())) {
|
||||
@ -263,7 +264,7 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
|
||||
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(),
|
||||
new MongoDbIncrementalSnapshotContext<>(false), positions);
|
||||
|
@ -11,8 +11,6 @@
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
@ -24,6 +22,7 @@
|
||||
import io.debezium.connector.mongodb.Filters;
|
||||
import io.debezium.connector.mongodb.MongoUtil;
|
||||
import io.debezium.function.BlockingConsumer;
|
||||
import io.debezium.function.BlockingFunction;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.Metronome;
|
||||
|
||||
@ -85,7 +84,7 @@ protected MongoDbConnection(ConnectionString connectionString,
|
||||
* @param desc the description of the operation, for logging purposes
|
||||
* @param operation the operation to be performed
|
||||
*/
|
||||
public void execute(String desc, Consumer<MongoClient> operation) {
|
||||
public void execute(String desc, BlockingConsumer<MongoClient> operation) throws InterruptedException {
|
||||
execute(desc, client -> {
|
||||
operation.accept(client);
|
||||
return null;
|
||||
@ -99,43 +98,12 @@ public void execute(String desc, Consumer<MongoClient> operation) {
|
||||
* @param operation the operation to be performed
|
||||
* @return return value of the executed operation
|
||||
*/
|
||||
public <T> T execute(String desc, Function<MongoClient, T> operation) {
|
||||
public <T> T execute(String desc, BlockingFunction<MongoClient, T> operation) throws InterruptedException {
|
||||
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
|
||||
while (true) {
|
||||
try (var client = connectionSupplier.get()) {
|
||||
return operation.apply(client);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
errorHandler.onError(desc, t);
|
||||
if (!isRunning()) {
|
||||
throw new DebeziumException("Operation failed and MongoDB connection '" + name + "' termination requested", t);
|
||||
}
|
||||
try {
|
||||
errorMetronome.pause();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
// Interruption is not propagated
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
|
||||
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
|
||||
*
|
||||
* @param desc the description of the operation, for logging purposes
|
||||
* @param operation the operation to be performed on a node of preferred type.
|
||||
* @throws InterruptedException if the operation was interrupted
|
||||
*/
|
||||
public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation) throws InterruptedException {
|
||||
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
|
||||
while (true) {
|
||||
MongoClient client = connectionSupplier.get();
|
||||
try {
|
||||
operation.accept(client);
|
||||
return;
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw e;
|
||||
}
|
||||
@ -154,7 +122,7 @@ public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation
|
||||
*
|
||||
* @return the database names; never null but possibly empty
|
||||
*/
|
||||
public Set<String> databaseNames() {
|
||||
public Set<String> databaseNames() throws InterruptedException {
|
||||
return execute("get database names", client -> {
|
||||
Set<String> databaseNames = new HashSet<>();
|
||||
|
||||
@ -175,7 +143,7 @@ public Set<String> databaseNames() {
|
||||
*
|
||||
* @return the collection identifiers; never null
|
||||
*/
|
||||
public List<CollectionId> collections() {
|
||||
public List<CollectionId> collections() throws InterruptedException {
|
||||
return execute("get collections in databases", client -> {
|
||||
List<CollectionId> collections = new ArrayList<>();
|
||||
Set<String> databaseNames = databaseNames();
|
||||
|
@ -58,7 +58,7 @@ public void shouldUseSSL() throws InterruptedException, IOException {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateMovieDatabase() {
|
||||
public void shouldCreateMovieDatabase() throws InterruptedException {
|
||||
useConfiguration(config.edit()
|
||||
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, "dbA,dbB")
|
||||
.with(MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST, "dbB.moviesB")
|
||||
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.function;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A variant of {@link Function} that can be blocked and interrupted.
|
||||
* @param <T> the type of the input to the function
|
||||
* @param <R> the type of the result of the function* @author jcechace
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface BlockingFunction<T, R> {
|
||||
|
||||
R apply(T t) throws InterruptedException;
|
||||
|
||||
default <V> BlockingFunction<T, V> andThen(BlockingFunction<? super R, ? extends V> after) {
|
||||
Objects.requireNonNull(after);
|
||||
return (T t) -> after.apply(apply(t));
|
||||
}
|
||||
|
||||
static <T> BlockingFunction<T, T> identity() {
|
||||
return t -> t;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user