DBZ-1904 Rename queueing methods

This commit is contained in:
Jiri Pechanec 2020-04-30 12:51:20 +02:00 committed by Gunnar Morling
parent 7f23d74d90
commit b9fa634968

View File

@ -110,7 +110,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::dispatchTransactionMessage);
this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::enqueueTransactionMessage);
if (customHeartbeat != null) {
heartbeat = customHeartbeat;
}
@ -303,11 +303,11 @@ private void enqueueHeartbeat(SourceRecord record) throws InterruptedException {
queue.enqueue(new DataChangeEvent(record));
}
public void dispatchTransactionMessage(SourceRecord record) throws InterruptedException {
private void enqueueTransactionMessage(SourceRecord record) throws InterruptedException {
queue.enqueue(new DataChangeEvent(record));
}
public void dispatchSchemaChangeMessage(SourceRecord record) throws InterruptedException {
private void enqueueSchemaChangeMessage(SourceRecord record) throws InterruptedException {
queue.enqueue(new DataChangeEvent(record));
}
@ -448,7 +448,7 @@ public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedExcepti
final Struct value = schemaChangeRecordValue(event);
final SourceRecord record = new SourceRecord(null, event.getOffset(), topicName, partition,
schemaChangeKeySchema, key, schemaChangeValueSchema, value);
dispatchSchemaChangeMessage(record);
enqueueSchemaChangeMessage(record);
}
}
}