diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 34c456996..3941a9ad6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -6,7 +6,6 @@ package io.debezium.pipeline; import java.util.Objects; -import java.util.function.Supplier; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -66,7 +65,7 @@ public EventDispatcher(TopicSelector topicSelector, DatabaseSchema schema, * receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to the given * {@link ChangeEventCreator} for converting them into data change events. */ - public void dispatchDataChangeEvent(T dataCollectionId, Supplier changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException { + public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException { // TODO Handle Heartbeat // TODO Handle JMX @@ -83,19 +82,19 @@ public void dispatchDataChangeEvent(T dataCollectionId, Supplier schemaChangeEventEmitter) throws InterruptedException { + public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException { if(!filter.isIncluded(dataCollectionId)) { LOGGER.trace("Skipping data change event for {}", dataCollectionId); return; } - schemaChangeEventEmitter.get().emitSchemaChangeEvent(new SchemaChangeEventReceiver()); + schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver()); } private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver {