From 83b3b54be068ed25e61cf8ad9cf1691eff005e28 Mon Sep 17 00:00:00 2001 From: ming luo Date: Fri, 20 Jan 2023 21:27:51 -0500 Subject: [PATCH] DBZ-6033 debezium server pulsar support non-default tenant and namespace --- COPYRIGHT.txt | 1 + .../debezium/server/pulsar/PulsarChangeConsumer.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 087dd848e..03dfcd2d2 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -299,6 +299,7 @@ Mike Graham Mike Kamornikov Mikhail Dubrovin Mincong Huang +Ming Luo Mohamed Pudukulathan Mohammad Yousuf Minhaj Zia Moira Tagle diff --git a/debezium-server/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java b/debezium-server/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java index 6ec91f851..a51a0d09a 100644 --- a/debezium-server/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java +++ b/debezium-server/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java @@ -59,6 +59,12 @@ public interface ProducerBuilder { @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; + @ConfigProperty(name = PROP_PREFIX + "tenant", defaultValue = "public") + String pulsarTenant; + + @ConfigProperty(name = PROP_PREFIX + "namespace", defaultValue = "default") + String pulsarNamespace; + @PostConstruct void connect() { final Config config = ConfigProvider.getConfig(); @@ -92,17 +98,18 @@ void close() { } private Producer createProducer(String topicName, Object value) { + final String topicFullName = pulsarTenant + "/" + pulsarNamespace + "/" + topicName; try { if (value instanceof String) { return pulsarClient.newProducer(Schema.STRING) .loadConf(producerConfig) - .topic(topicName) + .topic(topicFullName) .create(); } else { return pulsarClient.newProducer() .loadConf(producerConfig) - .topic(topicName) + .topic(topicFullName) .create(); } }