DBZ-4013 Skip invalid table

This commit is contained in:
Jiri Pechanec 2021-09-17 09:04:04 +02:00 committed by Gunnar Morling
parent e1fb3cc0e6
commit 26d0e78023
3 changed files with 42 additions and 11 deletions

View File

@ -455,7 +455,7 @@ private static class SystemTablesPredicate implements TableFilter {
@Override
public boolean isIncluded(TableId t) {
return !(t.schema().toLowerCase().equals("cdc") ||
return t.schema() != null && !(t.schema().toLowerCase().equals("cdc") ||
t.schema().toLowerCase().equals("sys") ||
t.table().toLowerCase().equals("systranschemas"));
}

View File

@ -216,11 +216,13 @@ protected void readChunk() throws InterruptedException {
currentTable = databaseSchema.tableFor(currentTableId);
if (currentTable == null) {
LOGGER.warn("Schema not found for table '{}', known tables {}", currentTableId, databaseSchema.tableIds());
break;
nextDataCollection();
continue;
}
if (currentTable.primaryKeyColumns().isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
break;
nextDataCollection();
continue;
}
if (!context.maximumKey().isPresent()) {
context.maximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
@ -234,7 +236,7 @@ protected void readChunk() throws InterruptedException {
LOGGER.info(
"No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
currentTableId);
context.nextDataCollection();
nextDataCollection();
continue;
}
if (LOGGER.isInfoEnabled()) {
@ -247,10 +249,7 @@ protected void readChunk() throws InterruptedException {
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
currentTableId);
tableScanCompleted();
context.nextDataCollection();
if (!context.snapshotRunning()) {
progressListener.snapshotCompleted();
}
nextDataCollection();
}
else {
break;
@ -269,6 +268,13 @@ protected void readChunk() throws InterruptedException {
}
}
private void nextDataCollection() {
context.nextDataCollection();
if (!context.snapshotRunning()) {
progressListener.snapshotCompleted();
}
}
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, OffsetContext offsetContext) throws InterruptedException {

View File

@ -7,6 +7,7 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -16,6 +17,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
@ -118,11 +120,14 @@ protected String pkFieldName() {
return "pk";
}
protected void sendAdHocSnapshotSignal() throws SQLException {
protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException {
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
.map(x -> '"' + x + '"')
.collect(Collectors.joining(", "));
try (final JdbcConnection connection = databaseConnection()) {
String query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')",
signalTableName(), tableDataCollectionId());
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [%s]}')",
signalTableName(), dataCollectionIdsList);
logger.info("Sending signal with query {}", query);
connection.execute(query);
}
@ -131,6 +136,10 @@ protected void sendAdHocSnapshotSignal() throws SQLException {
}
}
protected void sendAdHocSnapshotSignal() throws SQLException {
sendAdHocSnapshotSignal(tableDataCollectionId());
}
protected void startConnector(DebeziumEngine.CompletionCallback callback) {
startConnector(Function.identity(), callback);
}
@ -173,6 +182,22 @@ public void snapshotOnly() throws Exception {
}
}
@Test
public void invalidTablesInTheList() throws Exception {
Testing.Print.enable();
populateTable();
startConnector();
sendAdHocSnapshotSignal("invalid1", tableName(), "invalid2");
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
@Test
public void inserts() throws Exception {
Testing.Print.enable();