diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java index 7cd40c776..717ab6944 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java @@ -95,34 +95,53 @@ private static boolean isFormat(Class> format1, return format1 == format2; } + private class ConvertingChangeConsumer implements ChangeConsumer { + + private final ChangeConsumer handler; + + private ConvertingChangeConsumer(ChangeConsumer handler) { + this.handler = handler; + } + + @Override + public void handleBatch(List records, RecordCommitter committer) throws InterruptedException { + handler.handleBatch(records.stream() + .map(x -> toFormat.apply(x)) + .collect(Collectors.toList()), + new RecordCommitter() { + + @Override + public void markProcessed(R record) throws InterruptedException { + committer.markProcessed(fromFormat.apply(record)); + } + + @Override + public void markBatchFinished() throws InterruptedException { + committer.markBatchFinished(); + } + + @Override + public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) + throws InterruptedException { + committer.markProcessed(fromFormat.apply(record), sourceOffsets); + } + + @Override + public DebeziumEngine.Offsets buildOffsets() { + return committer.buildOffsets(); + } + }); + } + + @Override + public boolean supportsTombstoneEvents() { + return handler.supportsTombstoneEvents(); + } + } + @Override public Builder notifying(ChangeConsumer handler) { - delegate.notifying( - (records, committer) -> handler.handleBatch(records.stream() - .map(x -> toFormat.apply(x)) - .collect(Collectors.toList()), - new RecordCommitter() { - - @Override - public void markProcessed(R record) throws InterruptedException { - committer.markProcessed(fromFormat.apply(record)); - } - - @Override - public void markBatchFinished() throws InterruptedException { - committer.markBatchFinished(); - } - - @Override - public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException { - committer.markProcessed(fromFormat.apply(record), sourceOffsets); - } - - @Override - public DebeziumEngine.Offsets buildOffsets() { - return committer.buildOffsets(); - } - })); + delegate.notifying(new ConvertingChangeConsumer(handler)); return this; }