diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index d6277db85..92962f7b8 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -110,7 +110,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector t this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete(); this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema; - this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::dispatchTransactionMessage); + this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::enqueueTransactionMessage); if (customHeartbeat != null) { heartbeat = customHeartbeat; } @@ -303,11 +303,11 @@ private void enqueueHeartbeat(SourceRecord record) throws InterruptedException { queue.enqueue(new DataChangeEvent(record)); } - public void dispatchTransactionMessage(SourceRecord record) throws InterruptedException { + private void enqueueTransactionMessage(SourceRecord record) throws InterruptedException { queue.enqueue(new DataChangeEvent(record)); } - public void dispatchSchemaChangeMessage(SourceRecord record) throws InterruptedException { + private void enqueueSchemaChangeMessage(SourceRecord record) throws InterruptedException { queue.enqueue(new DataChangeEvent(record)); } @@ -448,7 +448,7 @@ public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedExcepti final Struct value = schemaChangeRecordValue(event); final SourceRecord record = new SourceRecord(null, event.getOffset(), topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value); - dispatchSchemaChangeMessage(record); + enqueueSchemaChangeMessage(record); } } }