DBZ-7098 Converting engine should honor unsupported tombstone flag

This commit is contained in:
Jiri Pechanec 2024-01-04 14:28:26 +01:00
parent 13703c7457
commit 7a80c9dae8

View File

@ -95,34 +95,53 @@ private static boolean isFormat(Class<? extends SerializationFormat<?>> format1,
return format1 == format2; return format1 == format2;
} }
private class ConvertingChangeConsumer implements ChangeConsumer<SourceRecord> {
private final ChangeConsumer<R> handler;
private ConvertingChangeConsumer(ChangeConsumer<R> handler) {
this.handler = handler;
}
@Override
public void handleBatch(List<SourceRecord> records, RecordCommitter<SourceRecord> committer) throws InterruptedException {
handler.handleBatch(records.stream()
.map(x -> toFormat.apply(x))
.collect(Collectors.toList()),
new RecordCommitter<R>() {
@Override
public void markProcessed(R record) throws InterruptedException {
committer.markProcessed(fromFormat.apply(record));
}
@Override
public void markBatchFinished() throws InterruptedException {
committer.markBatchFinished();
}
@Override
public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets)
throws InterruptedException {
committer.markProcessed(fromFormat.apply(record), sourceOffsets);
}
@Override
public DebeziumEngine.Offsets buildOffsets() {
return committer.buildOffsets();
}
});
}
@Override
public boolean supportsTombstoneEvents() {
return handler.supportsTombstoneEvents();
}
}
@Override @Override
public Builder<R> notifying(ChangeConsumer<R> handler) { public Builder<R> notifying(ChangeConsumer<R> handler) {
delegate.notifying( delegate.notifying(new ConvertingChangeConsumer(handler));
(records, committer) -> handler.handleBatch(records.stream()
.map(x -> toFormat.apply(x))
.collect(Collectors.toList()),
new RecordCommitter<R>() {
@Override
public void markProcessed(R record) throws InterruptedException {
committer.markProcessed(fromFormat.apply(record));
}
@Override
public void markBatchFinished() throws InterruptedException {
committer.markBatchFinished();
}
@Override
public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
committer.markProcessed(fromFormat.apply(record), sourceOffsets);
}
@Override
public DebeziumEngine.Offsets buildOffsets() {
return committer.buildOffsets();
}
}));
return this; return this;
} }