DBZ-2496 Corrections to code formatting
This commit is contained in:
parent
925c7e35f5
commit
24f375aa3d
@ -5,6 +5,17 @@
|
||||
*/
|
||||
package io.debezium.connector.mongodb;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.config.ConfigDef.Width;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.config.ConfigDefinition;
|
||||
import io.debezium.config.Configuration;
|
||||
@ -13,15 +24,6 @@
|
||||
import io.debezium.config.Field.ValidationOutput;
|
||||
import io.debezium.connector.AbstractSourceInfo;
|
||||
import io.debezium.connector.SourceInfoStructMaker;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.config.ConfigDef.Width;
|
||||
|
||||
/**
|
||||
* The configuration properties.
|
||||
@ -419,14 +421,13 @@ public static SnapshotMode parse(String value, String defaultValue) {
|
||||
.withInvisibleRecommender();
|
||||
|
||||
public static final Field SNAPSHOT_FILTER_QUERY_BY_COLLECTION = Field.create("snapshot.collection.filter.overrides")
|
||||
.withDisplayName("Snapshot mode")
|
||||
.withDisplayName("Snapshot mode")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDescription("This property contains a comma-separated list of <dbName>.<collectionName>, for which "
|
||||
+ " the initial snapshot may be a subset of data present in the data source. The subset would be defined"
|
||||
+ " by mongodb filter query specified as value for property snapshot.collection.filter.override.<dbname>.<collectionName>");
|
||||
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDescription("This property contains a comma-separated list of <dbName>.<collectionName>, for which "
|
||||
+ " the initial snapshot may be a subset of data present in the data source. The subset would be defined"
|
||||
+ " by mongodb filter query specified as value for property snapshot.collection.filter.override.<dbname>.<collectionName>");
|
||||
|
||||
private static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit()
|
||||
.name("MongoDB")
|
||||
@ -459,8 +460,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
|
||||
FIELD_BLACKLIST,
|
||||
FIELD_EXCLUDE_LIST,
|
||||
FIELD_RENAMES,
|
||||
SNAPSHOT_FILTER_QUERY_BY_COLLECTION
|
||||
)
|
||||
SNAPSHOT_FILTER_QUERY_BY_COLLECTION)
|
||||
.connector(
|
||||
MAX_COPY_THREADS,
|
||||
SNAPSHOT_MODE)
|
||||
@ -534,8 +534,8 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
|
||||
}
|
||||
}
|
||||
|
||||
public Optional<String> getSnapshotFilterQueryForCollection(CollectionId collectionId){
|
||||
return Optional.ofNullable(getSnapshotFilterQueryByCollection().get(collectionId.dbName() +"."+collectionId.name()));
|
||||
public Optional<String> getSnapshotFilterQueryForCollection(CollectionId collectionId) {
|
||||
return Optional.ofNullable(getSnapshotFilterQueryByCollection().get(collectionId.dbName() + "." + collectionId.name()));
|
||||
}
|
||||
|
||||
public Map<String, String> getSnapshotFilterQueryByCollection() {
|
||||
@ -549,10 +549,10 @@ public Map<String, String> getSnapshotFilterQueryByCollection() {
|
||||
|
||||
for (String collection : collectionList.split(",")) {
|
||||
snapshotFilterQueryByCollection.put(
|
||||
collection,
|
||||
getConfig().getString(
|
||||
new StringBuilder().append(SNAPSHOT_FILTER_QUERY_BY_COLLECTION).append(".")
|
||||
.append(collection).toString()));
|
||||
collection,
|
||||
getConfig().getString(
|
||||
new StringBuilder().append(SNAPSHOT_FILTER_QUERY_BY_COLLECTION).append(".")
|
||||
.append(collection).toString()));
|
||||
}
|
||||
|
||||
return Collections.unmodifiableMap(snapshotFilterQueryByCollection);
|
||||
|
@ -970,13 +970,13 @@ record = records.recordsForTopic("mongo.dbit.fieldnamedop").get(1);
|
||||
public void shouldFilterItemsInCollectionWhileTakingSnapshot() throws Exception {
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
config = TestHelper.getConfiguration().edit()
|
||||
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
|
||||
.with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION,"dbit.simpletons,dbit.restaurants1")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + "." +"dbit.simpletons", "{ \"_id\": { \"$gt\": 4 } }")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + "." +"dbit.restaurants1", "{ cuisine: \"American \" }")
|
||||
.build();
|
||||
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
|
||||
.with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION, "dbit.simpletons,dbit.restaurants1")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + "." + "dbit.simpletons", "{ \"_id\": { \"$gt\": 4 } }")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + "." + "dbit.restaurants1", "{ cuisine: \"American \" }")
|
||||
.build();
|
||||
|
||||
// Set up the replication context for connections ...
|
||||
context = new MongoDbTaskContext(config);
|
||||
@ -1006,7 +1006,6 @@ public void shouldFilterItemsInCollectionWhileTakingSnapshot() throws Exception
|
||||
|
||||
stopConnector();
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user