DBZ-6033 debezium server pulsar support non-default tenant and namespace

This commit is contained in:
ming luo 2023-01-20 21:27:51 -05:00 committed by Jiri Pechanec
parent e4a94d55dd
commit 83b3b54be0
2 changed files with 10 additions and 2 deletions

View File

@ -299,6 +299,7 @@ Mike Graham
Mike Kamornikov
Mikhail Dubrovin
Mincong Huang
Ming Luo
Mohamed Pudukulathan
Mohammad Yousuf Minhaj Zia
Moira Tagle

View File

@ -59,6 +59,12 @@ public interface ProducerBuilder {
@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;
@ConfigProperty(name = PROP_PREFIX + "tenant", defaultValue = "public")
String pulsarTenant;
@ConfigProperty(name = PROP_PREFIX + "namespace", defaultValue = "default")
String pulsarNamespace;
@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();
@ -92,17 +98,18 @@ void close() {
}
private Producer<?> createProducer(String topicName, Object value) {
final String topicFullName = pulsarTenant + "/" + pulsarNamespace + "/" + topicName;
try {
if (value instanceof String) {
return pulsarClient.newProducer(Schema.STRING)
.loadConf(producerConfig)
.topic(topicName)
.topic(topicFullName)
.create();
}
else {
return pulsarClient.newProducer()
.loadConf(producerConfig)
.topic(topicName)
.topic(topicFullName)
.create();
}
}