From 9dac11e1cdbf2f162f534325116c8e852b3a1577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cvsantonastaso=E2=80=9D?= Date: Mon, 18 Dec 2023 21:56:55 +0100 Subject: [PATCH] DBZ-7164 Support offset commit for the signal topic --- COPYRIGHT.txt | 2 +- .../debezium/connector/mysql/SignalsIT.java | 230 ++++++++++++++++++ .../signal/channels/KafkaSignalChannel.java | 18 +- jenkins-jobs/scripts/config/Aliases.txt | 4 +- 4 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 8e3bd0858..5ce24522e 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -498,7 +498,7 @@ V K Vadzim Ramanenka Vasily Ulianko Vedit Firat Arig -Vincenzo Santoynastaso +Vincenzo Santonastaso Victar Malinouski Victor Xiang Vivek Wassan diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java new file mode 100644 index 000000000..75889a0dd --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SignalsIT.java @@ -0,0 +1,230 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.mysql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.engine.DebeziumEngine; +import io.debezium.junit.logging.LogInterceptor; +import io.debezium.kafka.KafkaCluster; +import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot; +import io.debezium.pipeline.signal.channels.KafkaSignalChannel; +import io.debezium.util.Collect; +import io.debezium.util.Testing; + +public class SignalsIT extends AbstractConnectorTest { + protected static final String SERVER_NAME = "is_test"; + protected static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-is.txt") + .toAbsolutePath(); + protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "incremental_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH); + protected static KafkaCluster kafka; + + @BeforeClass + public static void startKafka() throws Exception { + File dataDir = Testing.Files.createTestingDirectory("signal_cluster"); + Testing.Files.delete(dataDir); + kafka = new KafkaCluster().usingDirectory(dataDir) + .deleteDataPriorToStartup(true) + .deleteDataUponShutdown(true) + .addBrokers(1) + .withKafkaConfiguration(Collect.propertiesOf( + "auto.create.topics.enable", "false", + "zookeeper.session.timeout.ms", "20000")) + .startup(); + + kafka.createTopic("signals_topic", 1, 1); + } + + @AfterClass + public static void stopKafka() { + if (kafka != null) { + kafka.shutdown(); + } + } + + @Before + public void before() throws SQLException { + stopConnector(); + DATABASE.createAndInitialize(); + initializeConnectorTestFramework(); + Testing.Files.delete(SCHEMA_HISTORY_PATH); + } + + @After + public void after() { + try { + stopConnector(); + } + finally { + Testing.Files.delete(SCHEMA_HISTORY_PATH); + } + } + + protected Configuration.Builder config() { + return DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) + .with(MySqlConnectorConfig.USER, "mysqluser") + .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue()) + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) + .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1) + .with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) + .with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true) + .with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO); + } + + @Test + public void givenOffsetCommitDisabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() + throws ExecutionException, InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class); + startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka") + .with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()) + .with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())); + assertConnectorIsRunning(); + sendExecuteSnapshotKafkaSignal("b"); + waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); + Thread.sleep(5000); + + final SourceRecords records = consumeRecordsByTopic(2); + assertThat(records.allRecordsInOrder()).hasSize(2); + assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue(); + } + + @Test + public void givenOffsetCommitDisabledAndASignalSentWithConnectorDown_whenConnectorComesBackUp_thenNoSignalsProcessed() + throws ExecutionException, InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class); + sendExecuteSnapshotKafkaSignal("b"); + Thread.sleep(5000); + startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka") + .with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()) + .with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())); + assertConnectorIsRunning(); + waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); + + final SourceRecords records = consumeRecordsByTopic(2); + assertThat(records.allRecordsInOrder()).hasSize(0); + assertThat(logInterceptor.countOccurrences("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isEqualTo(0); + } + + @Test + public void givenOffsetCommitEnabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() + throws ExecutionException, InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class); + startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka") + .with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()) + .with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true) + .with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())); + assertConnectorIsRunning(); + sendExecuteSnapshotKafkaSignal("b"); + waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); + + assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue(); + } + + @Test + public void givenOffsetCommitEnabledAndMultipleSignalsSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() + throws ExecutionException, InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class); + sendExecuteSnapshotKafkaSignal("b"); + startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka") + .with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()) + .with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true) + .with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())); + assertConnectorIsRunning(); + waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); + + stopConnector(); + Thread.sleep(5000); + + sendExecuteSnapshotKafkaSignal("c"); + startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka") + .with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()) + .with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true) + .with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())); + assertConnectorIsRunning(); + waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); + + assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue(); + assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[c]'")).isTrue(); + } + + @Test + public void givenOffsetCommitEnabledAndASignalSentWithConnectorNotRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() + throws ExecutionException, InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class); + sendExecuteSnapshotKafkaSignal("b"); + startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka") + .with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()) + .with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true) + .with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())); + assertConnectorIsRunning(); + + assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue(); + } + + protected void startConnector(Function custConfig) { + startConnector(custConfig, loggingCompletion()); + } + + protected void startConnector(Function custConfig, + DebeziumEngine.CompletionCallback callback) { + final Configuration config = custConfig.apply(config()).build(); + start(MySqlConnector.class, config, callback); + assertConnectorIsRunning(); + + waitForAvailableRecords(5, TimeUnit.SECONDS); + assertNoRecordsToConsume(); + } + + protected void sendExecuteSnapshotKafkaSignal(final String table) throws ExecutionException, InterruptedException { + String signalValue = String.format( + "{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", + table); + sendKafkaSignal(signalValue); + } + + protected String getSignalsTopic() { + return "signals_topic"; + } + + protected void sendKafkaSignal(String signalValue) throws ExecutionException, InterruptedException { + final ProducerRecord executeSnapshotSignal = new ProducerRecord<>(getSignalsTopic(), 0, SERVER_NAME, signalValue); + + final Configuration signalProducerConfig = Configuration.create() + .withDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.brokerList()) + .withDefault(ProducerConfig.CLIENT_ID_CONFIG, "signals") + .withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .build(); + try (KafkaProducer producer = new KafkaProducer<>(signalProducerConfig.asProperties())) { + producer.send(executeSnapshotSignal).get(); + } + } + +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java index 13dad9f22..d48eb0c53 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/KafkaSignalChannel.java @@ -91,6 +91,15 @@ public class KafkaSignalChannel implements SignalChannelReader { .withImportance(ConfigDef.Importance.LOW) .withDescription("Consumer group id for the signal topic") .withDefault("kafka-signal"); + public static final Field SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.consumer.offset.commit.enabled") + .withDisplayName("Enable offset commit for the signal topic") + .withType(ConfigDef.Type.BOOLEAN) + .withDefault(false) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("Enable the offset commit for the signal topic in order to guarantee At-Least-Once delivery." + + "If disabled, only signals received when the consumer is up&running are processed. Any signals received when the consumer is down are lost.") + .withValidation(Field::isRequired); private Optional processSignal(ConsumerRecord record) { @@ -163,7 +172,7 @@ public void reset(Object reference) { private static Configuration buildKafkaConfiguration(Configuration signalConfig) { - return signalConfig.subset(CONSUMER_PREFIX, true).edit() + Configuration.Builder confBuilder = signalConfig.subset(CONSUMER_PREFIX, true).edit() .withDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, signalConfig.getString(BOOTSTRAP_SERVERS)) .withDefault(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()) .withDefault(ConsumerConfig.GROUP_ID_CONFIG, signalConfig.getString(GROUP_ID)) @@ -171,7 +180,12 @@ private static Configuration buildKafkaConfiguration(Configuration signalConfig) .withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) .withDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) // readjusted since 0.10.1.0 .withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + if (signalConfig.getBoolean(SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED)) { + confBuilder.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) + .withDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + return confBuilder .build(); } diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index 92c12c02b..6c76fed0d 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -227,8 +227,8 @@ j2gg0s,Yanjie Wang REMY David,David Remy tyrantlucifer,Chao Tian ryanvanhuuksloot,Ryan van Huuksloot -vsantona,Vincenzo Santoynastaso -“vsantonastaso”,Vincenzo Santoynastaso +vsantona,Vincenzo Santonastaso +“vsantonastaso”,Vincenzo Santonastaso rolevinks,Stein Rolevink matan-cohen,Matan Cohen BigGillyStyle,Andy Pickler