diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index a22d98bac..8827e26fc 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -540,3 +540,4 @@ Miguel Angel Sotomayor Stephen Clarkson Gurps Bassi Massimo Fortunat +Vincenzo Santonastaso diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java index 35c501fdb..13dad9f22 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java @@ -83,6 +83,15 @@ public class KafkaSignalChannel implements SignalChannelReader { .withDefault(0) .withValidation(Field::isNonNegativeInteger); + public static final Field GROUP_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + + "kafka.groupId") + .withDisplayName("Consumer group id for the signal topic") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.LOW) + .withDescription("Consumer group id for the signal topic") + .withDefault("kafka-signal"); + private Optional processSignal(ConsumerRecord record) { if (!connectorName.equals(record.key())) { @@ -133,7 +142,6 @@ public String name() { @Override public void init(CommonConnectorConfig connectorConfig) { - String signalName = "kafka-signal"; this.connectorName = connectorConfig.getLogicalName(); Configuration signalConfig = connectorConfig.getConfig().subset(CONFIGURATION_FIELD_PREFIX_STRING, false) .edit() @@ -141,7 +149,7 @@ public void init(CommonConnectorConfig connectorConfig) { .build(); this.topicName = signalConfig.getString(SIGNAL_TOPIC); this.pollTimeoutMs = Duration.ofMillis(signalConfig.getInteger(SIGNAL_POLL_TIMEOUT_MS)); - Configuration consumerConfig = buildKafkaConfiguration(signalName, signalConfig); + Configuration consumerConfig = buildKafkaConfiguration(signalConfig); this.signalsConsumer = new KafkaConsumer<>(consumerConfig.asProperties()); LOGGER.info("Subscribing to signals topic '{}'", topicName); signalsConsumer.assign(Collect.arrayListOf(new TopicPartition(topicName, 0))); @@ -153,12 +161,12 @@ public void reset(Object reference) { signalsConsumer.seek(new TopicPartition(topicName, 0), ((Long) reference) + 1); } - private static Configuration buildKafkaConfiguration(String signalName, Configuration signalConfig) { + private static Configuration buildKafkaConfiguration(Configuration signalConfig) { return signalConfig.subset(CONSUMER_PREFIX, true).edit() .withDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, signalConfig.getString(BOOTSTRAP_SERVERS)) .withDefault(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()) - .withDefault(ConsumerConfig.GROUP_ID_CONFIG, signalName) + .withDefault(ConsumerConfig.GROUP_ID_CONFIG, signalConfig.getString(GROUP_ID)) .withDefault(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1) // get even the smallest message .withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) .withDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) // readjusted since 0.10.1.0 diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index f01206613..bd026a1e5 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -225,3 +225,4 @@ j2gg0s,Yanjie Wang REMY David,David Remy tyrantlucifer,Chao Tian ryanvanhuuksloot,Ryan van Huuksloot +vsantona, Vincenzo Santonastaso