DBZ-2456 fixes after rebase

This commit is contained in:
Kaushik Iyer 2020-10-01 19:22:18 +05:30 committed by Jiri Pechanec
parent edcfc747ee
commit 7d9e8fdc46
9 changed files with 17 additions and 14 deletions

View File

@ -17,6 +17,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonTimestamp;
@ -342,7 +343,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
LOGGER.info("Beginning snapshot of '{}' at {}", rsName, rsOffsetContext.getOffset());
final Set<CollectionId> collections = determineAllowedDataCollectionsForSnapshot(primaryClient.collections());
final List<CollectionId> collections = determineDataCollectionsToBeSnapshotted(primaryClient.collections()).collect(Collectors.toList());
snapshotProgressListener.monitoredDataCollectionsDetermined(collections);
if (connectionContext.maxNumberOfCopyThreads() > 1) {
// Since multiple copy threads are to be used, create a thread pool and initiate the copy.

View File

@ -966,6 +966,7 @@ record = records.recordsForTopic("mongo.dbit.fieldnamedop").get(1);
}
@Test
@FixFor("DBZ-2456")
public void shouldSelectivelySnapshot() throws InterruptedException {
config = TestHelper.getConfiguration().edit()
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)

View File

@ -255,7 +255,7 @@ protected void execute() {
final List<TableId> tablesToSnapshotSchemaAfterUnlock = new ArrayList<>();
Set<TableId> lockedTables = Collections.emptySet();
final Set<String> snapshotAllowedTables = context.getConnectorConfig().getSnapshotAllowedTables();
final Set<String> snapshotAllowedTables = context.getConnectorConfig().getDataCollectionsToBeSnapshotted();
final Predicate<TableId> isAllowedForSnapshot = tableId -> snapshotAllowedTables.size() == 0
|| snapshotAllowedTables.stream().anyMatch(s -> tableId.identifier().matches(s));
try {
@ -408,7 +408,6 @@ protected void execute() {
else {
logger.info("\t '{}' is not added among known tables", id);
}
//
if (filters.tableFilter().and(isAllowedForSnapshot).test(id)) {
capturedTableIds.add(id);
logger.info("\t including '{}' for further processing", id);

View File

@ -1159,6 +1159,7 @@ record = s2recs.get(0);
}
@Test
@FixFor("DBZ-2456")
public void shouldAllowForSelectiveSnapshot() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()

View File

@ -371,6 +371,7 @@ public void testBlacklistColumn() throws Exception {
}
@Test
@FixFor("DBZ-2456")
public void shouldSelectivelySnapshotTables() throws SQLException, InterruptedException {
connection.execute(
"CREATE TABLE table_a (id int, name varchar(30), amount integer primary key(id))",

View File

@ -312,7 +312,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
.withValidation(Field::isNonNegativeInteger);
public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list")
.withDisplayName("Snapshot Mode include Data Collection")
.withDisplayName("Snapshot mode include data collection")
.withType(Type.LIST)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
@ -553,7 +553,7 @@ public Set<Envelope.Operation> getSkippedOps() {
}
}
public Set<String> getSnapshotAllowedTables() {
public Set<String> getDataCollectionsToBeSnapshotted() {
return Optional.ofNullable(config.getString(SNAPSHOT_MODE_TABLES))
.map(tables -> Strings.setOf(tables, Function.identity()))
.orElseGet(Collections::emptySet);

View File

@ -6,8 +6,9 @@
package io.debezium.pipeline.source;
import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,15 +85,14 @@ public SnapshotResult execute(ChangeEventSourceContext context) throws Interrupt
}
}
protected <T extends DataCollectionId> Set<T> determineAllowedDataCollectionsForSnapshot(final Set<T> allDataCollections) {
final Set<String> snapshotAllowedTables = connectorConfig.getSnapshotAllowedTables();
if (snapshotAllowedTables.size() == 0) {
return allDataCollections;
protected <T extends DataCollectionId> Stream<T> determineDataCollectionsToBeSnapshotted(final Collection<T> allDataCollections) {
final Set<String> snapshotAllowedDataCollections = connectorConfig.getDataCollectionsToBeSnapshotted();
if (snapshotAllowedDataCollections.size() == 0) {
return allDataCollections.stream();
}
else {
return allDataCollections.stream()
.filter(dataCollectionId -> snapshotAllowedTables.stream().anyMatch(s -> dataCollectionId.identifier().matches(s)))
.collect(Collectors.toSet());
.filter(dataCollectionId -> snapshotAllowedDataCollections.stream().anyMatch(s -> dataCollectionId.identifier().matches(s)));
}
}

View File

@ -186,7 +186,7 @@ private Set<TableId> sort(Set<TableId> capturedTables) throws Exception {
}
private void determineCapturedTables(RelationalSnapshotContext ctx) throws Exception {
Set<TableId> allTableIds = determineAllowedDataCollectionsForSnapshot(getAllTableIds(ctx));
Set<TableId> allTableIds = determineDataCollectionsToBeSnapshotted(getAllTableIds(ctx)).collect(Collectors.toSet());
Set<TableId> capturedTables = new HashSet<>();

View File

@ -1138,7 +1138,7 @@ Must not be used with `collection.include.list`.
|[[mongodb-property-snapshot-include-collection-list]]<<mongodb-property-snapshot-include-collection-list, `snapshot.include.collection.list`>>
| All collections specified in `collection.include.list`
|An optional, comma-separated list of regular expressions that match names of schemas specified in `table.include.list` for which you *want* to take the snapshot.
|An optional, comma-separated list of regular expressions that match names of schemas specified in `collection.include.list` for which you *want* to take the snapshot.
|[[mongodb-property-field-blacklist]]
[[mongodb-property-field-exclude-list]]<<mongodb-property-field-exclude-list, `field.exclude{zwsp}.list`>>