diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 2bce62901..54b066bec 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -195,6 +195,27 @@ public boolean isFullUpdate() { protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0; + public static final Field LOGICAL_NAME = Field.create("mongodb.name") + .withDisplayName("Namespace") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.HIGH) + .required() + .withDescription("Unique name that identifies the MongoDB replica set or cluster and all recorded offsets, and " + + "that is used as a prefix for all schemas and topics. " + + "Each distinct MongoDB installation should have a separate namespace and monitored by " + + "at most one Debezium connector."); + + public static final Field CONNECTION_STRING = Field.create("mongodb.connection.string") + .withDisplayName("Connection String") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.HIGH) + .withValidation(MongoDbConnectorConfig::validateConnectionString) + .withDescription("Database connection string."); + /** * The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the * replica set. @@ -209,17 +230,18 @@ public boolean isFullUpdate() { .withDescription("The hostname and port pairs (in the form 'host' or 'host:port') " + "of the MongoDB server(s) in the replica set."); - public static final Field LOGICAL_NAME = Field.create("mongodb.name") - .withDisplayName("Namespace") - .withType(Type.STRING) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0)) - .withWidth(Width.MEDIUM) - .withImportance(Importance.HIGH) - .required() - .withDescription("Unique name that identifies the MongoDB replica set or cluster and all recorded offsets, and " - + "that is used as a prefix for all schemas and topics. " - + "Each distinct MongoDB installation should have a separate namespace and monitored by " - + "at most one Debezium connector."); + public static final Field AUTO_DISCOVER_MEMBERS = Field.create("mongodb.members.auto.discover") + .withDisplayName("Auto-discovery") + .withType(Type.BOOLEAN) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3)) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDefault(true) + .withValidation(Field::isBoolean, MongoDbConnectorConfig::validateAutodiscovery) + .withDescription("Specifies whether the addresses in 'hosts' are seeds that should be " + + "used to discover all members of the cluster or replica set ('true'), " + + "or whether the address(es) in 'hosts' should be used as is ('false'). " + + "The default is 'true'."); public static final Field USER = Field.create("mongodb.user") .withDisplayName("User") @@ -237,23 +259,6 @@ public boolean isFullUpdate() { .withImportance(Importance.HIGH) .withDescription("Password to be used when connecting to MongoDB, if necessary."); - public static final Field CONNECTION_STRING = Field.create("mongodb.connection.string") - .withDisplayName("Connection String") - .withType(Type.STRING) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1)) - .withWidth(Width.MEDIUM) - .withImportance(Importance.HIGH) - .withValidation(MongoDbConnectorConfig::validateConnectionString) - .withDescription("Database connection string."); - public static final Field AUTH_SOURCE = Field.create("mongodb.authsource") - .withDisplayName("Credentials Database") - .withType(Type.STRING) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 3)) - .withWidth(Width.SHORT) - .withImportance(Importance.MEDIUM) - .withDefault(ReplicaSetDiscovery.ADMIN_DATABASE_NAME) - .withDescription("Database containing user credentials."); - public static final Field MONGODB_POLL_INTERVAL_MS = Field.create("mongodb.poll.interval.ms") .withDisplayName("Replica membership poll interval (ms)") .withType(Type.LONG) @@ -284,6 +289,15 @@ public boolean isFullUpdate() { .withValidation(Field::isBoolean) .withDescription("Whether invalid host names are allowed when using SSL. If true the connection will not prevent man-in-the-middle attacks"); + public static final Field CONNECT_TIMEOUT_MS = Field.create("mongodb.connect.timeout.ms") + .withDisplayName("Connect Timeout MS") + .withType(Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0)) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDefault(10_000) + .withDescription("The connection timeout, given in milliseconds. Defaults to 10 seconds (10,000 ms)."); + public static final Field CONNECT_BACKOFF_INITIAL_DELAY_MS = Field.create("connect.backoff.initial.delay.ms") .withDisplayName("Initial delay before reconnection (ms)") .withType(Type.LONG) @@ -306,6 +320,15 @@ public boolean isFullUpdate() { .withDescription( "The maximum delay when trying to reconnect to a primary after a connection cannot be made or when no primary is available, given in milliseconds. Defaults to 120 second (120,000 ms)."); + public static final Field AUTH_SOURCE = Field.create("mongodb.authsource") + .withDisplayName("Credentials Database") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 3)) + .withWidth(Width.SHORT) + .withImportance(Importance.MEDIUM) + .withDefault(ReplicaSetDiscovery.ADMIN_DATABASE_NAME) + .withDescription("Database containing user credentials."); + public static final Field MAX_FAILED_CONNECTIONS = Field.create("connect.max.attempts") .withDisplayName("Connection attempt limit") .withType(Type.INT) @@ -320,18 +343,23 @@ public boolean isFullUpdate() { + CONNECT_BACKOFF_MAX_DELAY_MS + "' results in " + "just over 20 minutes of attempts before failing."); - public static final Field AUTO_DISCOVER_MEMBERS = Field.create("mongodb.members.auto.discover") - .withDisplayName("Auto-discovery") - .withType(Type.BOOLEAN) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3)) + public static final Field SERVER_SELECTION_TIMEOUT_MS = Field.create("mongodb.server.selection.timeout.ms") + .withDisplayName("Server selection timeout MS") + .withType(Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 5)) .withWidth(Width.SHORT) .withImportance(Importance.LOW) - .withDefault(true) - .withValidation(Field::isBoolean, MongoDbConnectorConfig::validateAutodiscovery) - .withDescription("Specifies whether the addresses in 'hosts' are seeds that should be " - + "used to discover all members of the cluster or replica set ('true'), " - + "or whether the address(es) in 'hosts' should be used as is ('false'). " - + "The default is 'true'."); + .withDefault(30_000) + .withDescription("The server selection timeout, given in milliseconds. Defaults to 10 seconds (10,000 ms)."); + + public static final Field SOCKET_TIMEOUT_MS = Field.create("mongodb.socket.timeout.ms") + .withDisplayName("Socket timeout MS") + .withType(Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 6)) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDefault(0) + .withDescription("The socket timeout, given in milliseconds. Defaults to 0 ms."); /** * A comma-separated list of regular expressions that match the databases to be monitored. @@ -419,17 +447,6 @@ public boolean isFullUpdate() { " where databaseName and collectionName may contain the wildcard (*) which matches any characters," + " the colon character (:) is used to determine rename mapping of field."); - public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") - .withDisplayName("Snapshot mode") - .withEnum(SnapshotMode.class, SnapshotMode.INITIAL) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0)) - .withWidth(Width.SHORT) - .withImportance(Importance.LOW) - .withDescription("The criteria for running a snapshot upon startup of the connector. " - + "Options include: " - + "'initial' (the default) to specify the connector should always perform an initial sync when required; " - + "'never' to specify the connector should never perform an initial sync "); - public static final Field CAPTURE_MODE = Field.create("capture.mode") .withDisplayName("Capture mode") .withEnum(CaptureMode.class, CaptureMode.CHANGE_STREAMS_UPDATE_FULL) @@ -441,38 +458,22 @@ public boolean isFullUpdate() { + "'change_streams' to capture changes via MongoDB Change Streams, update events do not contain full documents; " + "'change_streams_update_full' (the default) to capture changes via MongoDB Change Streams, update events contain full documents"); - public static final Field CONNECT_TIMEOUT_MS = Field.create("mongodb.connect.timeout.ms") - .withDisplayName("Connect Timeout MS") - .withType(Type.INT) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0)) - .withWidth(Width.SHORT) - .withImportance(Importance.LOW) - .withDefault(10_000) - .withDescription("The connection timeout, given in milliseconds. Defaults to 10 seconds (10,000 ms)."); - - public static final Field SERVER_SELECTION_TIMEOUT_MS = Field.create("mongodb.server.selection.timeout.ms") - .withDisplayName("Server selection timeout MS") - .withType(Type.INT) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 5)) - .withWidth(Width.SHORT) - .withImportance(Importance.LOW) - .withDefault(30_000) - .withDescription("The server selection timeout, given in milliseconds. Defaults to 10 seconds (10,000 ms)."); - - public static final Field SOCKET_TIMEOUT_MS = Field.create("mongodb.socket.timeout.ms") - .withDisplayName("Socket timeout MS") - .withType(Type.INT) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 6)) - .withWidth(Width.SHORT) - .withImportance(Importance.LOW) - .withDefault(0) - .withDescription("The socket timeout, given in milliseconds. Defaults to 0 ms."); - protected static final Field TASK_ID = Field.create("mongodb.task.id") .withDescription("Internal use only") .withValidation(Field::isInteger) .withInvisibleRecommender(); + public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") + .withDisplayName("Snapshot mode") + .withEnum(SnapshotMode.class, SnapshotMode.INITIAL) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0)) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("The criteria for running a snapshot upon startup of the connector. " + + "Options include: " + + "'initial' (the default) to specify the connector should always perform an initial sync when required; " + + "'never' to specify the connector should never perform an initial sync "); + public static final Field SNAPSHOT_FILTER_QUERY_BY_COLLECTION = Field.create("snapshot.collection.filter.overrides") .withDisplayName("Snapshot mode") .withType(Type.STRING)