DBZ-7260 Removed RS from MongoDbChangeEventSourceFactory

This commit is contained in:
Jakub Cechacek 2023-12-13 15:36:18 +01:00
parent 85fdf034e5
commit 86ce26bf4a
5 changed files with 7 additions and 19 deletions

View File

@ -44,7 +44,6 @@ public class MongoDbChangeEventSourceFactory implements ChangeEventSourceFactory
private final ErrorHandler errorHandler;
private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
private final Clock clock;
private final ReplicaSet replicaSet;
private final MongoDbTaskContext taskContext;
private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
private final MongoDbSchema schema;
@ -52,13 +51,12 @@ public class MongoDbChangeEventSourceFactory implements ChangeEventSourceFactory
public MongoDbChangeEventSourceFactory(MongoDbConnectorConfig configuration, ErrorHandler errorHandler,
EventDispatcher<MongoDbPartition, CollectionId> dispatcher, Clock clock,
ReplicaSet replicaSets, MongoDbTaskContext taskContext, MongoDbSchema schema,
MongoDbTaskContext taskContext, MongoDbSchema schema,
MongoDbStreamingChangeEventSourceMetrics streamingMetrics) {
this.configuration = configuration;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.replicaSet = replicaSets;
this.taskContext = taskContext;
this.connections = createMongoDbConnectionFactory(taskContext.getConnectionContext());
this.schema = schema;
@ -72,7 +70,6 @@ public SnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getSnap
configuration,
taskContext,
connections,
replicaSet,
dispatcher,
clock,
snapshotProgressListener,
@ -86,7 +83,6 @@ public StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getStr
configuration,
taskContext,
connections,
replicaSet,
dispatcher,
errorHandler,
clock,
@ -102,7 +98,6 @@ public StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getStr
final MongoDbIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new MongoDbIncrementalSnapshotChangeEventSource(
configuration,
connections,
replicaSet,
dispatcher,
schema,
clock,

View File

@ -24,7 +24,6 @@
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.metrics.MongoDbChangeEventSourceMetricsFactory;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
@ -83,7 +82,6 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
final Schema structSchema = connectorConfig.getSourceInfoStructMaker().schema();
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicNamingStrategy(), structSchema, schemaNameAdjuster);
final ReplicaSet replicaSet = getReplicaSet(connectorConfig);
final MongoDbOffsetContext previousOffset = getPreviousOffset(connectorConfig);
final Clock clock = Clock.system();
@ -143,7 +141,6 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
errorHandler,
dispatcher,
clock,
replicaSet,
taskContext,
schema,
metricsFactory.getStreamingMetrics(taskContext, queue, metadataProvider)),
@ -201,10 +198,6 @@ private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorC
}
}
private ReplicaSet getReplicaSet(MongoDbConnectorConfig connectorConfig) {
return connectorConfig.getReplicaSet();
}
@Override
protected Configuration withMaskedSensitiveOptions(Configuration config) {
return super.withMaskedSensitiveOptions(config).withMasked(MongoDbConnectorConfig.CONNECTION_STRING.name());

View File

@ -82,7 +82,7 @@ public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEven
private final AtomicBoolean aborted = new AtomicBoolean(false);
public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext,
MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSet replicaSet,
MongoDbConnection.ChangeEventSourceConnectionFactory connections,
EventDispatcher<MongoDbPartition, CollectionId> dispatcher, Clock clock,
SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, ErrorHandler errorHandler,
NotificationService<MongoDbPartition, MongoDbOffsetContext> notificationService) {
@ -90,7 +90,7 @@ public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig connectorConfig,
this.connectorConfig = connectorConfig;
this.taskContext = taskContext;
this.connections = connections;
this.replicaSet = replicaSet;
this.replicaSet = connectorConfig.getReplicaSet();
this.dispatcher = dispatcher;
this.clock = clock;
this.snapshotProgressListener = snapshotProgressListener;

View File

@ -53,7 +53,7 @@ public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSo
private MongoDbOffsetContext effectiveOffset;
public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext,
MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSet replicaSet,
MongoDbConnection.ChangeEventSourceConnectionFactory connections,
EventDispatcher<MongoDbPartition, CollectionId> dispatcher,
ErrorHandler errorHandler, Clock clock, MongoDbStreamingChangeEventSourceMetrics streamingMetrics) {
this.connectorConfig = connectorConfig;
@ -61,7 +61,7 @@ public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig,
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
this.replicaSet = replicaSet;
this.replicaSet = connectorConfig.getReplicaSet();
this.taskContext = taskContext;
this.connections = connections;
this.streamingMetrics = streamingMetrics;

View File

@ -101,7 +101,7 @@ public class MongoDbIncrementalSnapshotChangeEventSource
private final ExecutorService incrementalSnapshotThreadPool;
public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config,
MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSet replicaSet,
MongoDbConnection.ChangeEventSourceConnectionFactory connections,
EventDispatcher<MongoDbPartition, CollectionId> dispatcher,
MongoDbSchema collectionSchema,
Clock clock,
@ -109,7 +109,7 @@ public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config
DataChangeEventListener<MongoDbPartition> dataChangeEventListener,
NotificationService<MongoDbPartition, ? extends OffsetContext> notificationService) {
this.connectorConfig = config;
this.replicaSet = replicaSet;
this.replicaSet = config.getReplicaSet();
this.connections = connections;
this.dispatcher = dispatcher;
this.collectionSchema = collectionSchema;