DBZ-628 Using common properties for MongoDB connector

This commit is contained in:
Gunnar Morling 2018-02-19 21:31:04 +01:00 committed by Jiri Pechanec
parent e895fff2ac
commit 69df7c4ade
2 changed files with 9 additions and 51 deletions

View File

@ -103,33 +103,6 @@ public class MongoDbConnectorConfig extends CommonConnectorConfig {
.withDescription("Maximum number of threads used to perform an intial sync of the collections in a replica set. "
+ "Defaults to 1.");
public static final Field MAX_QUEUE_SIZE = Field.create("max.queue.size")
.withDisplayName("Change event buffer size")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(2048)
.withValidation(MongoDbConnectorConfig::validateMaxQueueSize)
.withDescription("Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 2048, and should always be larger than the maximum batch size.");
public static final Field MAX_BATCH_SIZE = Field.create("max.batch.size")
.withDisplayName("Change event batch size")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(1024)
.withValidation(Field::isPositiveInteger)
.withDescription("Maximum size of each batch of source records. Defaults to 1024.");
public static final Field POLL_INTERVAL_MS = Field.create("poll.interval.ms")
.withDisplayName("Poll interval (ms)")
.withType(Type.LONG)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(TimeUnit.SECONDS.toMillis(1))
.withValidation(Field::isPositiveInteger)
.withDescription("Frequency in milliseconds to wait after processing no events for new change events to appear. Defaults to 1 second (1000 ms).");
public static final Field CONNECT_BACKOFF_INITIAL_DELAY_MS = Field.create("connect.backoff.initial.delay.ms")
.withDisplayName("Initial delay before reconnection (ms)")
.withType(Type.LONG)
@ -228,8 +201,9 @@ public class MongoDbConnectorConfig extends CommonConnectorConfig {
public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTS, LOGICAL_NAME,
SSL_ENABLED, SSL_ALLOW_INVALID_HOSTNAMES,
MAX_COPY_THREADS, MAX_QUEUE_SIZE, MAX_BATCH_SIZE,
POLL_INTERVAL_MS,
MAX_COPY_THREADS, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.POLL_INTERVAL_MS,
MAX_FAILED_CONNECTIONS,
CONNECT_BACKOFF_INITIAL_DELAY_MS,
CONNECT_BACKOFF_MAX_DELAY_MS,
@ -252,7 +226,7 @@ protected static ConfigDef configDef() {
CONNECT_BACKOFF_MAX_DELAY_MS, MAX_FAILED_CONNECTIONS, AUTO_DISCOVER_MEMBERS,
SSL_ENABLED, SSL_ALLOW_INVALID_HOSTNAMES);
Field.group(config, "Events", DATABASE_WHITELIST, DATABASE_BLACKLIST, COLLECTION_WHITELIST, COLLECTION_BLACKLIST, CommonConnectorConfig.TOMBSTONES_ON_DELETE);
Field.group(config, "Connector", MAX_COPY_THREADS, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS);
Field.group(config, "Connector", MAX_COPY_THREADS, CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS);
return config;
}
@ -270,23 +244,6 @@ private static int validateHosts(Configuration config, Field field, ValidationOu
return count;
}
private static int validateMaxQueueSize(Configuration config, Field field, ValidationOutput problems) {
int maxQueueSize = config.getInteger(field);
int maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
int count = 0;
if (maxQueueSize <= 0) {
maxBatchSize = maxQueueSize / 2;
problems.accept(field, maxQueueSize, "A positive queue size is required");
++count;
}
if (maxQueueSize <= maxBatchSize) {
maxBatchSize = maxQueueSize / 2;
problems.accept(field, maxQueueSize, "Must be larger than the maximum batch size");
++count;
}
return count;
}
private static int validateCollectionBlacklist(Configuration config, Field field, ValidationOutput problems) {
String whitelist = config.getString(COLLECTION_WHITELIST);
String blacklist = config.getString(COLLECTION_BLACKLIST);

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.mongodb;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
@ -111,11 +110,13 @@ public void start(Map<String, String> props) {
"Unable to start MongoDB connector task since no replica sets were found at " + hosts);
}
MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
// Set up the task record queue ...
this.queue = new ChangeEventQueue.Builder<SourceRecord>()
.pollInterval(Duration.ofMillis(config.getLong(MongoDbConnectorConfig.POLL_INTERVAL_MS)))
.maxBatchSize(config.getInteger(MongoDbConnectorConfig.MAX_BATCH_SIZE))
.maxQueueSize(config.getInteger(MongoDbConnectorConfig.MAX_QUEUE_SIZE))
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.loggingContextSupplier(this::getLoggingContext)
.build();