DBZ-6689 make kafka signal consumer group configurable

This commit is contained in:
“vsantonastaso” 2023-08-11 11:26:02 +02:00 committed by Jiri Pechanec
parent b7e0e1cd2a
commit 64eaf72665
3 changed files with 14 additions and 4 deletions

View File

@ -540,3 +540,4 @@ Miguel Angel Sotomayor
Stephen Clarkson
Gurps Bassi
Massimo Fortunat
Vincenzo Santonastaso

View File

@ -83,6 +83,15 @@ public class KafkaSignalChannel implements SignalChannelReader {
.withDefault(0)
.withValidation(Field::isNonNegativeInteger);
public static final Field GROUP_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ "kafka.groupId")
.withDisplayName("Consumer group id for the signal topic")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Consumer group id for the signal topic")
.withDefault("kafka-signal");
private Optional<SignalRecord> processSignal(ConsumerRecord<String, String> record) {
if (!connectorName.equals(record.key())) {
@ -133,7 +142,6 @@ public String name() {
@Override
public void init(CommonConnectorConfig connectorConfig) {
String signalName = "kafka-signal";
this.connectorName = connectorConfig.getLogicalName();
Configuration signalConfig = connectorConfig.getConfig().subset(CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
@ -141,7 +149,7 @@ public void init(CommonConnectorConfig connectorConfig) {
.build();
this.topicName = signalConfig.getString(SIGNAL_TOPIC);
this.pollTimeoutMs = Duration.ofMillis(signalConfig.getInteger(SIGNAL_POLL_TIMEOUT_MS));
Configuration consumerConfig = buildKafkaConfiguration(signalName, signalConfig);
Configuration consumerConfig = buildKafkaConfiguration(signalConfig);
this.signalsConsumer = new KafkaConsumer<>(consumerConfig.asProperties());
LOGGER.info("Subscribing to signals topic '{}'", topicName);
signalsConsumer.assign(Collect.arrayListOf(new TopicPartition(topicName, 0)));
@ -153,12 +161,12 @@ public void reset(Object reference) {
signalsConsumer.seek(new TopicPartition(topicName, 0), ((Long) reference) + 1);
}
private static Configuration buildKafkaConfiguration(String signalName, Configuration signalConfig) {
private static Configuration buildKafkaConfiguration(Configuration signalConfig) {
return signalConfig.subset(CONSUMER_PREFIX, true).edit()
.withDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, signalConfig.getString(BOOTSTRAP_SERVERS))
.withDefault(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString())
.withDefault(ConsumerConfig.GROUP_ID_CONFIG, signalName)
.withDefault(ConsumerConfig.GROUP_ID_CONFIG, signalConfig.getString(GROUP_ID))
.withDefault(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1) // get even the smallest message
.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.withDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) // readjusted since 0.10.1.0

View File

@ -225,3 +225,4 @@ j2gg0s,Yanjie Wang
REMY David,David Remy
tyrantlucifer,Chao Tian
ryanvanhuuksloot,Ryan van Huuksloot
vsantona, Vincenzo Santonastaso