DBZ-7260 Merged connection string and task connection string properties to futher simplify RS name usage

This commit is contained in:
Jakub Cechacek 2023-12-17 15:43:05 +01:00
parent 8def3bf82c
commit 4c64e28832
12 changed files with 54 additions and 67 deletions

View File

@ -26,7 +26,6 @@
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
/**
* A Kafka Connect source connector that creates {@link MongoDbConnectorTask tasks} that replicate the context of one or more
@ -110,18 +109,18 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.emptyList();
}
// Partitioning the replica sets amongst the number of tasks ...
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
// ensure connection string has replicaSet when possible
var taskConnectionString = connectionContext.resolveTaskConnectionString();
logger.info("Configuring MongoDB connector task to capture events for connections to: {}", ConnectionStrings.mask(taskConnectionString));
taskConfigs.add(config.edit()
.with(MongoDbConnectorConfig.TASK_CONNECTION_STRING, taskConnectionString)
var taskConfig = config.edit()
.with(MongoDbConnectorConfig.CONNECTION_STRING, taskConnectionString)
.with(MongoDbConnectorConfig.TASK_ID, 0)
.build()
.asMap());
logger.debug("Configuring {} MongoDB connector task(s)", taskConfigs.size());
return taskConfigs;
.asMap();
logger.debug("Configuring MongoDB connector task");
return List.of(taskConfig);
}
@Override
@ -184,7 +183,6 @@ public List<CollectionId> getMatchingCollections(Configuration config) {
private MongoDbConnection getConnection(Configuration config) {
MongoDbTaskContext context = new MongoDbTaskContext(config);
ReplicaSet replicaSet = new ReplicaSet(context.getConnectionContext().connectionString());
return context.getConnectionContext().connect(context.getConnectionContext().connectionString(), context.filters(), (s, throwable) -> {
throw new DebeziumException(s, throwable);
});

View File

@ -501,13 +501,6 @@ public static OversizeHandlingMode parse(String value, String defaultValue) {
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0;
/**
* The task connection string
*/
public static final Field TASK_CONNECTION_STRING = Field.createInternal("mongodb.internal.task.connection.string")
.withDescription("Internal use only")
.withType(Type.LIST);
public static final Field ALLOW_OFFSET_INVALIDATION = Field.createInternal("mongodb.allow.offset.invalidation")
.withDescription("Allows offset invalidation when required by change of connection mode")
.withDefault(false)
@ -895,7 +888,7 @@ public static ConfigDef configDef() {
private final boolean offsetInvalidationAllowed;
private final int snapshotMaxThreads;
private final int cursorMaxAwaitTimeMs;
private final Optional<ConnectionString> taskConnectionString;
private final ConnectionString connectionString;
private final CursorPipelineOrder cursorPipelineOrder;
private final OversizeHandlingMode oversizeHandlingMode;
private final FiltersMatchMode filtersMatchMode;
@ -929,7 +922,7 @@ public MongoDbConnectorConfig(Configuration config) {
this.snapshotMaxThreads = resolveSnapshotMaxThreads(config);
this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0);
this.taskConnectionString = resolveTaskConnectionString(config);
this.connectionString = resolveConnectionString(config);
}
private static int validateHosts(Configuration config, Field field, ValidationOutput problems) {
@ -1110,8 +1103,12 @@ public boolean isOffsetInvalidationAllowed() {
return offsetInvalidationAllowed;
}
public Optional<ConnectionString> getTaskConnectionString() {
return taskConnectionString;
public ConnectionString getConnectionString() {
return connectionString;
}
public String getReplicaSetName() {
return ConnectionStrings.replicaSetName(connectionString);
}
public int getCursorMaxAwaitTime() {
@ -1188,10 +1185,9 @@ private static int resolveSnapshotMaxThreads(Configuration config) {
return config.getInteger(SNAPSHOT_MAX_THREADS);
}
private static Optional<ConnectionString> resolveTaskConnectionString(Configuration config) {
var connectionString = config.getString(MongoDbConnectorConfig.TASK_CONNECTION_STRING);
return Optional.ofNullable(connectionString)
.map(ConnectionString::new);
private static ConnectionString resolveConnectionString(Configuration config) {
var connectionString = config.getString(MongoDbConnectorConfig.CONNECTION_STRING);
return connectionString != null ? new ConnectionString(connectionString) : null;
}
@Override

View File

@ -53,8 +53,6 @@ public final class MongoDbConnectorTask extends BaseSourceTask<MongoDbPartition,
private static final String CONTEXT_NAME = "mongodb-connector-task";
private final Logger logger = LoggerFactory.getLogger(getClass());
// These are all effectively constants between start(...) and stop(...)
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile String taskName;
@ -79,8 +77,8 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicNamingStrategy(), structSchema, schemaNameAdjuster);
final Offsets<MongoDbPartition, MongoDbOffsetContext> previousOffset = getPreviousOffsets(
new MongoDbPartition.Provider(taskContext),
new MongoDbOffsetContext.Loader(taskContext));
new MongoDbPartition.Provider(connectorConfig),
new MongoDbOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
PreviousContext previousLogContext = taskContext.configureLoggingContext(taskName);

View File

@ -26,7 +26,6 @@
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor;
import io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotContext;
import io.debezium.pipeline.CommonOffsetContext;
@ -220,26 +219,24 @@ private static boolean booleanOffsetValue(Map<String, ?> values, String key) {
return false;
}
public static MongoDbOffsetContext empty(MongoDbConnectorConfig connectorConfig, String replicaSetName) {
public static MongoDbOffsetContext empty(MongoDbConnectorConfig connectorConfig) {
return new MongoDbOffsetContext(
new SourceInfo(connectorConfig, replicaSetName),
new SourceInfo(connectorConfig),
new TransactionContext(),
new MongoDbIncrementalSnapshotContext<>(false));
}
public static class Loader implements OffsetContext.Loader<MongoDbOffsetContext> {
private final MongoDbTaskContext taskContext;
private final MongoDbConnectorConfig connectorConfig;
public Loader(MongoDbTaskContext taskContext) {
this.taskContext = taskContext;
public Loader(MongoDbConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public MongoDbOffsetContext load(Map<String, ?> offset) {
var sourceInfo = new SourceInfo(
taskContext.getConnectorConfig(),
ConnectionStrings.replicaSetName(taskContext.getConnectionString()));
var sourceInfo = new SourceInfo(connectorConfig);
if (!booleanOffsetValue(offset, INITIAL_SYNC)) {
var position = positionFromOffset(offset);

View File

@ -10,7 +10,6 @@
import java.util.Objects;
import java.util.Set;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Collect;
@ -55,18 +54,15 @@ public String toString() {
public static class Provider implements Partition.Provider<MongoDbPartition> {
private final MongoDbConnectorConfig connectorConfig;
private final MongoDbTaskContext taskContext;
public Provider(MongoDbTaskContext taskContext) {
this.connectorConfig = taskContext.getConnectorConfig();
this.taskContext = taskContext;
public Provider(MongoDbConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Set<MongoDbPartition> getPartitions() {
return Collections.singleton(new MongoDbPartition(
connectorConfig.getLogicalName(),
ConnectionStrings.replicaSetName(taskContext.getConnectionString())));
return Collections.singleton(
new MongoDbPartition(connectorConfig.getLogicalName(), connectorConfig.getReplicaSetName()));
}
}
}
}

View File

@ -37,7 +37,6 @@
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.pipeline.ErrorHandler;
@ -214,7 +213,7 @@ private boolean isValidResumeToken(MongoDbPartition partition, BsonDocument toke
private void initSnapshotStartOffsets(MongoDbSnapshotContext snapshotCtx) {
LOGGER.info("Initializing empty Offset context");
snapshotCtx.offset = MongoDbOffsetContext.empty(connectorConfig, ConnectionStrings.replicaSetName(taskContext.getConnectionString()));
snapshotCtx.offset = MongoDbOffsetContext.empty(connectorConfig);
}
private void initSnapshotStartOffsets(MongoDbSnapshotContext snapshotCtx, MongoDbConnection mongo) throws InterruptedException {

View File

@ -213,7 +213,7 @@ else if (offsetContext.lastTimestamp() != null) {
protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConfig) {
LOGGER.info("Initializing empty Offset context");
return MongoDbOffsetContext.empty(connectorConfig, ConnectionStrings.replicaSetName(taskContext.getConnectionString()));
return MongoDbOffsetContext.empty(connectorConfig);
}
/**

View File

@ -67,8 +67,8 @@ public MongoDbConnectorConfig getConnectorConfig() {
/**
* Provides the capture mode used by connector runtime. This value can differ from requested
* configured value as the offets stored might be created by a different capture mode.
* In this case the configured value is overriden and the mode previously used is restored.
* configured value as the offsets stored might be created by a different capture mode.
* In this case the configured value is overridden and the mode previously used is restored.
*
* @return effectively used capture mode
*/
@ -77,11 +77,10 @@ public CaptureMode getCaptureMode() {
}
public ConnectionString getConnectionString() {
return connectorConfig.getTaskConnectionString()
.orElse(connectionContext.connectionString());
return connectorConfig.getConnectionString();
}
public MongoDbConnection connect(MongoDbConnection.ErrorHandler errorHandler) {
return connectionContext.connect(getConnectionString(), filters, errorHandler);
return connectionContext.connect(filters, errorHandler);
}
}

View File

@ -78,7 +78,7 @@ public final class SourceInfo extends BaseSourceInfo {
private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, null);
public boolean initialSync = false;
private final String replicaSetName;
private final MongoDbConnectorConfig connectorConfig;
/**
* Id of collection the current event applies to. May be {@code null} after noop events,
@ -144,9 +144,9 @@ static final class SessionTransactionId {
}
}
public SourceInfo(MongoDbConnectorConfig connectorConfig, String replicaSetName) {
public SourceInfo(MongoDbConnectorConfig connectorConfig) {
super(connectorConfig);
this.replicaSetName = replicaSetName;
this.connectorConfig = connectorConfig;
}
CollectionId collectionId() {
@ -223,7 +223,7 @@ private void noEvent(String resumeToken) {
private void noEvent(Position position) {
String namespace = "";
long wallTime = 0L;
onEvent(CollectionId.parse(replicaSetName, namespace), position, wallTime);
onEvent(CollectionId.parse(connectorConfig.getReplicaSetName(), namespace), position, wallTime);
}
public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEvent) {
@ -240,7 +240,7 @@ public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEve
}
}
onEvent(CollectionId.parse(replicaSetName, namespace), position, wallTime);
onEvent(CollectionId.parse(connectorConfig.getReplicaSetName(), namespace), position, wallTime);
}
private void onEvent(CollectionId collectionId, Position position, long wallTime) {
@ -301,7 +301,7 @@ protected String database() {
}
String replicaSetName() {
return replicaSetName;
return connectorConfig.getReplicaSetName();
}
long wallTime() {
@ -310,6 +310,6 @@ long wallTime() {
@Override
public String toString() {
return "SourceInfo [initialSyncReplicaSets=" + initialSync + ", collectionId=" + collectionId + ", position=" + position + "]";
return "SourceInfo [initialSync=" + initialSync + ", collectionId=" + collectionId + ", position=" + position + "]";
}
}

View File

@ -141,4 +141,8 @@ public MongoClient connect() {
public MongoDbConnection connect(ConnectionString connectionString, Filters filters, MongoDbConnection.ErrorHandler errorHandler) {
return new MongoDbConnection(connectionString, clientFactory, connectorConfig, filters, errorHandler);
}
public MongoDbConnection connect(Filters filters, MongoDbConnection.ErrorHandler errorHandler) {
return connect(connectionString(), filters, errorHandler);
}
}

View File

@ -74,8 +74,6 @@ protected void useConfiguration(Configuration config) {
private void initialize() {
context = new MongoDbTaskContext(config);
assertThat(context.getConnectionContext().connectionSeed()).isNotEmpty();
// Get a connection...
var connectionString = context.getConnectionContext().connectionString();
connection = context.getConnectionContext().connect(connectionString, context.filters(), TestHelper.connectionErrorHandler(3));
connection = context.connect(TestHelper.connectionErrorHandler(3));
}
}

View File

@ -59,18 +59,20 @@ public void beforeEach() {
private SourceInfo createSourceInfo() {
var config = new MongoDbConnectorConfig(Configuration.create()
.with(MongoDbConnectorConfig.CONNECTION_STRING, "mongodb://localhost:2017/?replicaSet=" + REPLICA_SET_NAME)
.with(CommonConnectorConfig.TOPIC_PREFIX, "serverX")
.build());
return new SourceInfo(config, REPLICA_SET_NAME);
return new SourceInfo(config);
}
private void createOffsetContext() {
var config = new MongoDbConnectorConfig(Configuration.create()
.with(MongoDbConnectorConfig.CONNECTION_STRING, "mongodb://localhost:2017/?replicaSet=" + REPLICA_SET_NAME)
.with(CommonConnectorConfig.TOPIC_PREFIX, "serverX")
.build());
context = MongoDbOffsetContext.empty(config, REPLICA_SET_NAME);
context = MongoDbOffsetContext.empty(config);
source = context.sourceInfo();
}