DBZ-3500 Incremental snapshots for Db2

This commit is contained in:
Jiri Pechanec 2021-05-10 12:57:02 +02:00 committed by Gunnar Morling
parent 90738f53a6
commit cbaf13841f
2 changed files with 28 additions and 5 deletions

View File

@ -59,6 +59,7 @@ public class IncrementalSnapshotContext<T> {
// State to be stored and recovered from offsets
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
private final boolean useCatalogBeforeSchema;
/**
* The PK of the last record that was passed to Kafka Connect. In case of
* connector restart the start of the first chunk will be populated from it.
@ -72,6 +73,14 @@ public class IncrementalSnapshotContext<T> {
*/
private Object[] maximumKey;
public IncrementalSnapshotContext() {
this(true);
}
public IncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
}
public boolean openWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
@ -147,13 +156,15 @@ private void addTablesIdsToSnapshot(List<T> dataCollectionIds) {
@SuppressWarnings("unchecked")
public List<T> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds) {
final List<T> newDataCollectionIds = dataCollectionIds.stream().map(x -> (T) TableId.parse(x)).collect(Collectors.toList());
final List<T> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> (T) TableId.parse(x, useCatalogBeforeSchema))
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
return newDataCollectionIds;
}
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, Class<U> clazz) {
final IncrementalSnapshotContext<U> context = new IncrementalSnapshotContext<>();
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, boolean useCatalogBeforeSchema, Class<U> clazz) {
final IncrementalSnapshotContext<U> context = new IncrementalSnapshotContext<>(useCatalogBeforeSchema);
final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY);
context.chunkEndPosition = (lastEventSentKeyStr != null)
@ -171,6 +182,10 @@ public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, Cla
return context;
}
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, Class<U> clazz) {
return load(offsets, true, clazz);
}
public void sendEvent(Object[] key) {
lastEventKeySent = key;
}

View File

@ -87,8 +87,8 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCo
continue;
}
dataRecords.forEach(record -> {
final int id = ((Struct) record.key()).getInt32("pk");
final int value = ((Struct) record.value()).getStruct("after").getInt32("aa");
final int id = ((Struct) record.key()).getInt32(pkFieldName());
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
dbChanges.put(id, value);
});
if (recordConsumer != null) {
@ -105,6 +105,14 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCo
return dbChanges;
}
protected String valueFieldName() {
return "aa";
}
protected String pkFieldName() {
return "pk";
}
protected void sendAdHocSnapshotSignal() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
connection.execute(