DBZ-5123 data-collection regex snapshot signal

This commit is contained in:
Chris Lee 2022-05-20 10:07:40 -06:00 committed by Jiri Pechanec
parent 3160987bc5
commit 2a8456d21b
8 changed files with 57 additions and 10 deletions

View File

@ -71,6 +71,7 @@ Chris Baumbauer
Chris Collingwood
Chris Cranford
Chris Egerton
Chris Lee
Chris Riccomini
Christian Posta
Christian Stein

View File

@ -90,17 +90,34 @@ protected String topicName() {
@Override
protected String tableName() {
return TableId.parse(DATABASE.qualifiedTableName("a")).toQuotedString('`');
return tableNameId().toQuotedString('`');
}
@Override
protected String signalTableName() {
return TableId.parse(DATABASE.qualifiedTableName("debezium_signal")).toQuotedString('`');
return tableNameId("debezium_signal").toQuotedString('`');
}
@Override
protected String tableName(String table) {
return TableId.parse(DATABASE.qualifiedTableName(table)).toQuotedString('`');
return tableNameId(table).toQuotedString('`');
}
@Override
protected String tableDataCollectionId() {
return tableNameId().toString();
}
private String dataCollectionName(String table) {
return tableNameId(table).toString();
}
private TableId tableNameId() {
return tableNameId("a");
}
private TableId tableNameId(String table) {
return TableId.parse(DATABASE.qualifiedTableName(table));
}
@Override
@ -203,7 +220,7 @@ public void tableWithDatetime() throws Exception {
waitForConnectorToStart();
waitForAvailableRecords(5, TimeUnit.SECONDS);
sendAdHocSnapshotSignal(tableName("a_dt"));
sendAdHocSnapshotSignal(dataCollectionName("a_dt"));
final int expectedRecordCount = ROWS;
final Map<Integer, List<Object>> dbChanges = consumeMixedWithIncrementalSnapshot(
@ -252,7 +269,7 @@ public void tableWithZeroDate() throws Exception {
waitForConnectorToStart();
waitForAvailableRecords(5, TimeUnit.SECONDS);
sendAdHocSnapshotSignal(tableName("a_date"));
sendAdHocSnapshotSignal(dataCollectionName("a_date"));
final int expectedRecordCount = 1;
final Map<Integer, List<Integer>> dbChanges = consumeMixedWithIncrementalSnapshot(

View File

@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
@ -411,7 +412,21 @@ private void nextDataCollection(P partition) {
public void addDataCollectionNamesToSnapshot(P partition, List<String> dataCollectionIds, OffsetContext offsetContext) throws InterruptedException {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
boolean shouldReadChunk = !context.snapshotRunning();
final List<T> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds);
List<String> expandedDataCollectionIds = dataCollectionIds
.stream()
.flatMap(x -> databaseSchema
.tableIds()
.stream()
.map(TableId::identifier)
.filter(t -> Pattern.compile(x).matcher(t).matches()))
.collect(Collectors.toList());
if (expandedDataCollectionIds.size() > dataCollectionIds.size()) {
LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", dataCollectionIds, expandedDataCollectionIds);
}
final List<T> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(expandedDataCollectionIds);
if (shouldReadChunk) {
progressListener.snapshotStarted(partition);
progressListener.monitoredDataCollectionsDetermined(partition, newDataCollectionIds);

View File

@ -470,6 +470,19 @@ public void snapshotPreceededBySchemaChange() throws Exception {
}
}
@Test
public void snapshotWithRegexDataCollections() throws Exception {
populateTable();
startConnector();
sendAdHocSnapshotSignal(".*");
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
@Override
protected int getMaximumEnqueuedRecordCount() {
return ROW_COUNT * 3;

View File

@ -501,7 +501,7 @@ See the next section for more details.
|`data-collections`
|_N/A_
| An array of qualified names of table to be snapshotted. +
| An array of comma-separated regular expressions that match fully-qualified names of tables to be snapshotted. +
The format of the names is the same as for xref:#{context}-property-signal-data-collection[signal.data.collection] configuration option.
|===

View File

@ -37,7 +37,7 @@ Currently, you can request only `incremental` snapshots.
|`data-collections`
|_N/A_
| An array that contains the fully-qualified names of the {data-collection} to be snapshotted. +
| An array that contains regular expressions matching the fully-qualified names of the {data-collection} to be snapshotted. +
The format of the names is the same as for the `signal.data.collection` configuration option.
|===

View File

@ -114,7 +114,7 @@ Rather, during the snapshot, {prodname} generates its own `id` string as a water
|`data-collections`
|A required component of the `data` field of a signal that specifies an array of {data-collection} names to include in the snapshot. +
The array lists {data-collection}s by their fully-qualified names, using the same format as you use to specify the name of the connector's signaling {data-collection} in the xref:{context}-property-signal-data-collection[`signal.data.collection`] configuration property.
The array lists regular expressions which match {data-collection}s by their fully-qualified names, using the same format as you use to specify the name of the connector's signaling {data-collection} in the xref:{context}-property-signal-data-collection[`signal.data.collection`] configuration property.
|`incremental`
|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. +

View File

@ -136,4 +136,5 @@ markallanson,Mark Allanson
AlexMiroshnikov,Alexey Miroshnikov
roeselert,Timo Roeseler
troeselereos,Timo Roeseler
Himanshu-LT,Himanshu Mishra
Himanshu-LT,Himanshu Mishra
Chrisss93,Chris Lee