DBZ-720 Re-using single instance of streaming event receiver in EventDispatcher

This commit is contained in:
Gunnar Morling 2018-07-11 10:43:33 +02:00 committed by Jiri Pechanec
parent 657650c581
commit aa1272cc21
6 changed files with 30 additions and 28 deletions

View File

@ -45,10 +45,13 @@ public class EventDispatcher<T extends DataCollectionId> {
private final HistorizedDatabaseSchema<T> historizedSchema;
private final ChangeEventQueue<Object> queue;
private final DataCollectionFilter<T> filter;
private final ChangeEventCreator changeEventCreator;
private final StreamingChangeRecordReceiver receiver;
public EventDispatcher(TopicSelector<T> topicSelector, DatabaseSchema<T> schema,
ChangeEventQueue<Object> queue,
DataCollectionFilter<T> filter) {
DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator) {
this.topicSelector = topicSelector;
this.schema = schema;
this.historizedSchema = schema instanceof HistorizedDatabaseSchema
@ -56,16 +59,18 @@ public EventDispatcher(TopicSelector<T> topicSelector, DatabaseSchema<T> schema,
: null;
this.queue = queue;
this.filter = filter;
this.changeEventCreator = changeEventCreator;
this.receiver = new StreamingChangeRecordReceiver();
}
/**
* Dispatches one or more {@link DataChangeEvent}s. If the given data collection is included in the currently
* captured set of collections, the given emitter will be invoked, so it can emit one or more events (in the common
* case, one event will be emitted, but e.g. in case of PK updates, it may be a deletion and a creation event). The
* receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to the given
* receiving coordinator creates {@link SourceRecord}s for all emitted events and passes them to this dispatcher's
* {@link ChangeEventCreator} for converting them into data change events.
*/
public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, ChangeEventCreator changeEventCreator) throws InterruptedException {
public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException {
// TODO Handle Heartbeat
// TODO Handle JMX
@ -84,7 +89,7 @@ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter chan
changeRecordEmitter.emitChangeRecords(
dataCollectionSchema,
new ChangeRecordReceiver(dataCollectionId, changeEventCreator, dataCollectionSchema)
receiver
);
}
@ -97,28 +102,17 @@ public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitt
schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
}
private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver {
private final T dataCollectionId;
private final ChangeEventCreator changeEventCreator;
private final DataCollectionSchema dataCollectionSchema;
private ChangeRecordReceiver(T dataCollectionId, ChangeEventCreator changeEventCreator,
DataCollectionSchema dataCollectionSchema) {
this.dataCollectionId = dataCollectionId;
this.changeEventCreator = changeEventCreator;
this.dataCollectionSchema = dataCollectionSchema;
}
private final class StreamingChangeRecordReceiver implements ChangeRecordEmitter.Receiver {
@Override
public void changeRecord(Operation operation, Object key, Struct value, OffsetContext offsetContext) throws InterruptedException {
public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation operation, Object key, Struct value, OffsetContext offsetContext) throws InterruptedException {
Objects.requireNonNull(key, "key must not be null");
Objects.requireNonNull(value, "key must not be null");
LOGGER.trace( "Received change record for {} operation on key {}", operation, key);
Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor(dataCollectionId);
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
SourceRecord record = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(),
topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), value);

View File

@ -20,6 +20,6 @@ public interface ChangeRecordEmitter {
void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException;
public interface Receiver {
void changeRecord(Operation operation, Object key, Struct value, OffsetContext offsetManager) throws InterruptedException;
void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offsetManager) throws InterruptedException;
}
}

View File

@ -57,7 +57,7 @@ private void emitCreateRecord(Receiver receiver, TableSchema tableSchema, Operat
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis());
receiver.changeRecord(operation, newKey, envelope, offsetContext);
receiver.changeRecord(tableSchema, operation, newKey, envelope, offsetContext);
}
private void emitUpdateRecord(Receiver receiver, TableSchema tableSchema, Operation operation)
@ -74,15 +74,15 @@ private void emitUpdateRecord(Receiver receiver, TableSchema tableSchema, Operat
// regular update
if (Objects.equals(oldKey, newKey)) {
Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis());
receiver.changeRecord(operation, newKey, envelope, offsetContext);
receiver.changeRecord(tableSchema, operation, newKey, envelope, offsetContext);
}
// PK update -> emit as delete and re-insert with new key
else {
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis());
receiver.changeRecord(Operation.DELETE, oldKey, envelope, offsetContext);
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, offsetContext);
envelope = tableSchema.getEnvelopeSchema().create(newValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis());
receiver.changeRecord(operation, oldKey, envelope, offsetContext);
receiver.changeRecord(tableSchema, operation, oldKey, envelope, offsetContext);
}
}
@ -93,7 +93,7 @@ private void emitDeleteRecord(Receiver receiver, TableSchema tableSchema, Operat
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, offsetContext.getSourceInfo(), clock.currentTimeInMillis());
receiver.changeRecord(operation, oldKey, envelope, offsetContext);
receiver.changeRecord(tableSchema, operation, oldKey, envelope, offsetContext);
}
protected abstract Operation getOperation();

View File

@ -49,6 +49,7 @@
@Immutable
public class TableSchema implements DataCollectionSchema {
private final TableId id;
private final Schema keySchema;
private final Envelope envelopeSchema;
private final Schema valueSchema;
@ -59,6 +60,7 @@ public class TableSchema implements DataCollectionSchema {
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the
* key and value for a given row of data.
*
* @param id the id of the table corresponding to this schema
* @param keySchema the schema for the primary key; may be null
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may
* return nulls
@ -66,9 +68,9 @@ public class TableSchema implements DataCollectionSchema {
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but
* may return nulls
*/
public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
Envelope envelopeSchema,
Schema valueSchema, Function<Object[], Struct> valueGenerator) {
public TableSchema(TableId id, Schema keySchema, Function<Object[], Object> keyGenerator,
Envelope envelopeSchema, Schema valueSchema, Function<Object[], Struct> valueGenerator) {
this.id = id;
this.keySchema = keySchema;
this.envelopeSchema = envelopeSchema;
this.valueSchema = valueSchema;
@ -76,6 +78,11 @@ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
this.valueGenerator = valueGenerator != null ? valueGenerator : (row) -> null;
}
@Override
public TableId id() {
return id;
}
/**
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}.
*

View File

@ -123,7 +123,7 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers);
// And the table schema ...
return new TableSchema(keySchema, keyGenerator, envelope, valSchema, valueGenerator);
return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator);
}
/**

View File

@ -11,6 +11,7 @@
public interface DataCollectionSchema {
DataCollectionId id();
Schema keySchema();
Envelope getEnvelopeSchema();
}