DBZ-720 Passing change event emitter directly to dispatch methods;

The indirection of going through a supplier wasn't really necessary
This commit is contained in:
Gunnar Morling 2018-07-10 11:48:42 +02:00 committed by Jiri Pechanec
parent a565361e56
commit 657650c581

View File

@ -6,7 +6,6 @@
package io.debezium.pipeline; package io.debezium.pipeline;
import java.util.Objects; import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -66,7 +65,7 @@ public EventDispatcher(TopicSelector<T> topicSelector, DatabaseSchema<T> schema,
* receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to the given * receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to the given
* {@link ChangeEventCreator} for converting them into data change events. * {@link ChangeEventCreator} for converting them into data change events.
*/ */
public void dispatchDataChangeEvent(T dataCollectionId, Supplier<ChangeRecordEmitter> changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException { public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException {
// TODO Handle Heartbeat // TODO Handle Heartbeat
// TODO Handle JMX // TODO Handle JMX
@ -83,19 +82,19 @@ public void dispatchDataChangeEvent(T dataCollectionId, Supplier<ChangeRecordEmi
throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId); throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
} }
changeRecordEmitter.get().emitChangeRecords( changeRecordEmitter.emitChangeRecords(
dataCollectionSchema, dataCollectionSchema,
new ChangeRecordReceiver(dataCollectionId, changeEventCreator, dataCollectionSchema) new ChangeRecordReceiver(dataCollectionId, changeEventCreator, dataCollectionSchema)
); );
} }
public void dispatchSchemaChangeEvent(T dataCollectionId, Supplier<SchemaChangeEventEmitter> schemaChangeEventEmitter) throws InterruptedException { public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
if(!filter.isIncluded(dataCollectionId)) { if(!filter.isIncluded(dataCollectionId)) {
LOGGER.trace("Skipping data change event for {}", dataCollectionId); LOGGER.trace("Skipping data change event for {}", dataCollectionId);
return; return;
} }
schemaChangeEventEmitter.get().emitSchemaChangeEvent(new SchemaChangeEventReceiver()); schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
} }
private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver { private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver {