DBZ-7164 Support offset commit for the signal topic

This commit is contained in:
“vsantonastaso” 2023-12-18 21:56:55 +01:00 committed by Fiore Mario Vitale
parent 225277c5c0
commit 9dac11e1cd
4 changed files with 249 additions and 5 deletions

View File

@ -498,7 +498,7 @@ V K
Vadzim Ramanenka
Vasily Ulianko
Vedit Firat Arig
Vincenzo Santoynastaso
Vincenzo Santonastaso
Victar Malinouski
Victor Xiang
Vivek Wassan

View File

@ -0,0 +1,230 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
public class SignalsIT extends AbstractConnectorTest {
protected static final String SERVER_NAME = "is_test";
protected static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-is.txt")
.toAbsolutePath();
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "incremental_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
protected static KafkaCluster kafka;
@BeforeClass
public static void startKafka() throws Exception {
File dataDir = Testing.Files.createTestingDirectory("signal_cluster");
Testing.Files.delete(dataDir);
kafka = new KafkaCluster().usingDirectory(dataDir)
.deleteDataPriorToStartup(true)
.deleteDataUponShutdown(true)
.addBrokers(1)
.withKafkaConfiguration(Collect.propertiesOf(
"auto.create.topics.enable", "false",
"zookeeper.session.timeout.ms", "20000"))
.startup();
kafka.createTopic("signals_topic", 1, 1);
}
@AfterClass
public static void stopKafka() {
if (kafka != null) {
kafka.shutdown();
}
}
@Before
public void before() throws SQLException {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
@After
public void after() {
try {
stopConnector();
}
finally {
Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
}
protected Configuration.Builder config() {
return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue())
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1)
.with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true)
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
}
@Test
public void givenOffsetCommitDisabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed()
throws ExecutionException, InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
sendExecuteSnapshotKafkaSignal("b");
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);
Thread.sleep(5000);
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.allRecordsInOrder()).hasSize(2);
assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
}
@Test
public void givenOffsetCommitDisabledAndASignalSentWithConnectorDown_whenConnectorComesBackUp_thenNoSignalsProcessed()
throws ExecutionException, InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
sendExecuteSnapshotKafkaSignal("b");
Thread.sleep(5000);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.allRecordsInOrder()).hasSize(0);
assertThat(logInterceptor.countOccurrences("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isEqualTo(0);
}
@Test
public void givenOffsetCommitEnabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed()
throws ExecutionException, InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
sendExecuteSnapshotKafkaSignal("b");
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);
assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
}
@Test
public void givenOffsetCommitEnabledAndMultipleSignalsSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed()
throws ExecutionException, InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
sendExecuteSnapshotKafkaSignal("b");
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);
stopConnector();
Thread.sleep(5000);
sendExecuteSnapshotKafkaSignal("c");
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
waitForAvailableRecords(1000, TimeUnit.MILLISECONDS);
assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[c]'")).isTrue();
}
@Test
public void givenOffsetCommitEnabledAndASignalSentWithConnectorNotRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed()
throws ExecutionException, InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
sendExecuteSnapshotKafkaSignal("b");
startConnector(x -> x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")
.with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)
.with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
assertConnectorIsRunning();
assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
}
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
startConnector(custConfig, loggingCompletion());
}
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig,
DebeziumEngine.CompletionCallback callback) {
final Configuration config = custConfig.apply(config()).build();
start(MySqlConnector.class, config, callback);
assertConnectorIsRunning();
waitForAvailableRecords(5, TimeUnit.SECONDS);
assertNoRecordsToConsume();
}
protected void sendExecuteSnapshotKafkaSignal(final String table) throws ExecutionException, InterruptedException {
String signalValue = String.format(
"{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}",
table);
sendKafkaSignal(signalValue);
}
protected String getSignalsTopic() {
return "signals_topic";
}
protected void sendKafkaSignal(String signalValue) throws ExecutionException, InterruptedException {
final ProducerRecord<String, String> executeSnapshotSignal = new ProducerRecord<>(getSignalsTopic(), 0, SERVER_NAME, signalValue);
final Configuration signalProducerConfig = Configuration.create()
.withDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.brokerList())
.withDefault(ProducerConfig.CLIENT_ID_CONFIG, "signals")
.withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(signalProducerConfig.asProperties())) {
producer.send(executeSnapshotSignal).get();
}
}
}

View File

@ -91,6 +91,15 @@ public class KafkaSignalChannel implements SignalChannelReader {
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Consumer group id for the signal topic")
.withDefault("kafka-signal");
public static final Field SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.consumer.offset.commit.enabled")
.withDisplayName("Enable offset commit for the signal topic")
.withType(ConfigDef.Type.BOOLEAN)
.withDefault(false)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Enable the offset commit for the signal topic in order to guarantee At-Least-Once delivery." +
"If disabled, only signals received when the consumer is up&running are processed. Any signals received when the consumer is down are lost.")
.withValidation(Field::isRequired);
private Optional<SignalRecord> processSignal(ConsumerRecord<String, String> record) {
@ -163,7 +172,7 @@ public void reset(Object reference) {
private static Configuration buildKafkaConfiguration(Configuration signalConfig) {
return signalConfig.subset(CONSUMER_PREFIX, true).edit()
Configuration.Builder confBuilder = signalConfig.subset(CONSUMER_PREFIX, true).edit()
.withDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, signalConfig.getString(BOOTSTRAP_SERVERS))
.withDefault(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString())
.withDefault(ConsumerConfig.GROUP_ID_CONFIG, signalConfig.getString(GROUP_ID))
@ -171,7 +180,12 @@ private static Configuration buildKafkaConfiguration(Configuration signalConfig)
.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
.withDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) // readjusted since 0.10.1.0
.withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (signalConfig.getBoolean(SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED)) {
confBuilder.withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.withDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
return confBuilder
.build();
}

View File

@ -227,8 +227,8 @@ j2gg0s,Yanjie Wang
REMY David,David Remy
tyrantlucifer,Chao Tian
ryanvanhuuksloot,Ryan van Huuksloot
vsantona,Vincenzo Santoynastaso
“vsantonastaso”,Vincenzo Santoynastaso
vsantona,Vincenzo Santonastaso
“vsantonastaso”,Vincenzo Santonastaso
rolevinks,Stein Rolevink
matan-cohen,Matan Cohen
BigGillyStyle,Andy Pickler