From aa1272cc219e7789681ba963469be52f9efa1f24 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Wed, 11 Jul 2018 10:43:33 +0200 Subject: [PATCH] DBZ-720 Re-using single instance of streaming event receiver in EventDispatcher --- .../io/debezium/pipeline/EventDispatcher.java | 30 ++++++++----------- .../pipeline/spi/ChangeRecordEmitter.java | 2 +- .../RelationalChangeRecordEmitter.java | 10 +++---- .../io/debezium/relational/TableSchema.java | 13 ++++++-- .../relational/TableSchemaBuilder.java | 2 +- .../debezium/schema/DataCollectionSchema.java | 1 + 6 files changed, 30 insertions(+), 28 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 3941a9ad6..1cd75b30e 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -45,10 +45,13 @@ public class EventDispatcher { private final HistorizedDatabaseSchema historizedSchema; private final ChangeEventQueue queue; private final DataCollectionFilter filter; + private final ChangeEventCreator changeEventCreator; + private final StreamingChangeRecordReceiver receiver; public EventDispatcher(TopicSelector topicSelector, DatabaseSchema schema, ChangeEventQueue queue, - DataCollectionFilter filter) { + DataCollectionFilter filter, + ChangeEventCreator changeEventCreator) { this.topicSelector = topicSelector; this.schema = schema; this.historizedSchema = schema instanceof HistorizedDatabaseSchema @@ -56,16 +59,18 @@ public EventDispatcher(TopicSelector topicSelector, DatabaseSchema schema, : null; this.queue = queue; this.filter = filter; + this.changeEventCreator = changeEventCreator; + this.receiver = new StreamingChangeRecordReceiver(); } /** * Dispatches one or more {@link DataChangeEvent}s. If the given data collection is included in the currently * captured set of collections, the given emitter will be invoked, so it can emit one or more events (in the common * case, one event will be emitted, but e.g. in case of PK updates, it may be a deletion and a creation event). The - * receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to the given + * receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to this dispatcher's * {@link ChangeEventCreator} for converting them into data change events. */ - public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException { + public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException { // TODO Handle Heartbeat // TODO Handle JMX @@ -84,7 +89,7 @@ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter chan changeRecordEmitter.emitChangeRecords( dataCollectionSchema, - new ChangeRecordReceiver(dataCollectionId, changeEventCreator, dataCollectionSchema) + receiver ); } @@ -97,28 +102,17 @@ public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitt schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver()); } - private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver { - - private final T dataCollectionId; - private final ChangeEventCreator changeEventCreator; - private final DataCollectionSchema dataCollectionSchema; - - private ChangeRecordReceiver(T dataCollectionId, ChangeEventCreator changeEventCreator, - DataCollectionSchema dataCollectionSchema) { - this.dataCollectionId = dataCollectionId; - this.changeEventCreator = changeEventCreator; - this.dataCollectionSchema = dataCollectionSchema; - } + private final class StreamingChangeRecordReceiver implements ChangeRecordEmitter.Receiver { @Override - public void changeRecord(Operation operation, Object key, Struct value, OffsetContext offsetContext) throws InterruptedException { + public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation operation, Object key, Struct value, OffsetContext offsetContext) throws InterruptedException { Objects.requireNonNull(key, "key must not be null"); Objects.requireNonNull(value, "key must not be null"); LOGGER.trace( "Received change record for {} operation on key {}", operation, key); Schema keySchema = dataCollectionSchema.keySchema(); - String topicName = topicSelector.topicNameFor(dataCollectionId); + String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id()); SourceRecord record = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), value); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java index 5da5baf96..4168680b9 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java @@ -20,6 +20,6 @@ public interface ChangeRecordEmitter { void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException; public interface Receiver { - void changeRecord(Operation operation, Object key, Struct value, OffsetContext offsetManager) throws InterruptedException; + void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offsetManager) throws InterruptedException; } } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java b/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java index c2959756a..965fc908b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java @@ -57,7 +57,7 @@ private void emitCreateRecord(Receiver receiver, TableSchema tableSchema, Operat Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis()); - receiver.changeRecord(operation, newKey, envelope, offsetContext); + receiver.changeRecord(tableSchema, operation, newKey, envelope, offsetContext); } private void emitUpdateRecord(Receiver receiver, TableSchema tableSchema, Operation operation) @@ -74,15 +74,15 @@ private void emitUpdateRecord(Receiver receiver, TableSchema tableSchema, Operat // regular update if (Objects.equals(oldKey, newKey)) { Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis()); - receiver.changeRecord(operation, newKey, envelope, offsetContext); + receiver.changeRecord(tableSchema, operation, newKey, envelope, offsetContext); } // PK update -> emit as delete and re-insert with new key else { Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis()); - receiver.changeRecord(Operation.DELETE, oldKey, envelope, offsetContext); + receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, offsetContext); envelope = tableSchema.getEnvelopeSchema().create(newValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis()); - receiver.changeRecord(operation, oldKey, envelope, offsetContext); + receiver.changeRecord(tableSchema, operation, oldKey, envelope, offsetContext); } } @@ -93,7 +93,7 @@ private void emitDeleteRecord(Receiver receiver, TableSchema tableSchema, Operat Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis()); - receiver.changeRecord(operation, oldKey, envelope, offsetContext); + receiver.changeRecord(tableSchema, operation, oldKey, envelope, offsetContext); } protected abstract Operation getOperation(); diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java index a50edba19..8635a6fdd 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java @@ -49,6 +49,7 @@ @Immutable public class TableSchema implements DataCollectionSchema { + private final TableId id; private final Schema keySchema; private final Envelope envelopeSchema; private final Schema valueSchema; @@ -59,6 +60,7 @@ public class TableSchema implements DataCollectionSchema { * Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the * key and value for a given row of data. * + * @param id the id of the table corresponding to this schema * @param keySchema the schema for the primary key; may be null * @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may * return nulls @@ -66,9 +68,9 @@ public class TableSchema implements DataCollectionSchema { * @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but * may return nulls */ - public TableSchema(Schema keySchema, Function keyGenerator, - Envelope envelopeSchema, - Schema valueSchema, Function valueGenerator) { + public TableSchema(TableId id, Schema keySchema, Function keyGenerator, + Envelope envelopeSchema, Schema valueSchema, Function valueGenerator) { + this.id = id; this.keySchema = keySchema; this.envelopeSchema = envelopeSchema; this.valueSchema = valueSchema; @@ -76,6 +78,11 @@ public TableSchema(Schema keySchema, Function keyGenerator, this.valueGenerator = valueGenerator != null ? valueGenerator : (row) -> null; } + @Override + public TableId id() { + return id; + } + /** * Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}. * diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index c27dbde85..df63a209c 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -123,7 +123,7 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t Function valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); // And the table schema ... - return new TableSchema(keySchema, keyGenerator, envelope, valSchema, valueGenerator); + return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator); } /** diff --git a/debezium-core/src/main/java/io/debezium/schema/DataCollectionSchema.java b/debezium-core/src/main/java/io/debezium/schema/DataCollectionSchema.java index 27d9a9f16..8f853035a 100644 --- a/debezium-core/src/main/java/io/debezium/schema/DataCollectionSchema.java +++ b/debezium-core/src/main/java/io/debezium/schema/DataCollectionSchema.java @@ -11,6 +11,7 @@ public interface DataCollectionSchema { + DataCollectionId id(); Schema keySchema(); Envelope getEnvelopeSchema(); }