From 7a80c9dae88f56d06eae1407374528a3124afbfc Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 4 Jan 2024 14:28:26 +0100 Subject: [PATCH] DBZ-7098 Converting engine should honor unsupported tombstone flag --- .../embedded/ConvertingEngineBuilder.java | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java index 7cd40c776..717ab6944 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java @@ -95,34 +95,53 @@ private static boolean isFormat(Class> format1, return format1 == format2; } + private class ConvertingChangeConsumer implements ChangeConsumer { + + private final ChangeConsumer handler; + + private ConvertingChangeConsumer(ChangeConsumer handler) { + this.handler = handler; + } + + @Override + public void handleBatch(List records, RecordCommitter committer) throws InterruptedException { + handler.handleBatch(records.stream() + .map(x -> toFormat.apply(x)) + .collect(Collectors.toList()), + new RecordCommitter() { + + @Override + public void markProcessed(R record) throws InterruptedException { + committer.markProcessed(fromFormat.apply(record)); + } + + @Override + public void markBatchFinished() throws InterruptedException { + committer.markBatchFinished(); + } + + @Override + public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) + throws InterruptedException { + committer.markProcessed(fromFormat.apply(record), sourceOffsets); + } + + @Override + public DebeziumEngine.Offsets buildOffsets() { + return committer.buildOffsets(); + } + }); + } + + @Override + public boolean supportsTombstoneEvents() { + return handler.supportsTombstoneEvents(); + } + } + @Override public Builder notifying(ChangeConsumer handler) { - delegate.notifying( - (records, committer) -> handler.handleBatch(records.stream() - .map(x -> toFormat.apply(x)) - .collect(Collectors.toList()), - new RecordCommitter() { - - @Override - public void markProcessed(R record) throws InterruptedException { - committer.markProcessed(fromFormat.apply(record)); - } - - @Override - public void markBatchFinished() throws InterruptedException { - committer.markBatchFinished(); - } - - @Override - public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException { - committer.markProcessed(fromFormat.apply(record), sourceOffsets); - } - - @Override - public DebeziumEngine.Offsets buildOffsets() { - return committer.buildOffsets(); - } - })); + delegate.notifying(new ConvertingChangeConsumer(handler)); return this; }