DBZ-4727 Add Kafka based pause/resume signals

Add pause and resume snapshot signals. See previous commit for more
details. These signal do the same, but are used by read-only MySQL
where sending signals is done by sending Kafka messages.
This commit is contained in:
Vojtech Juranek 2022-07-22 11:41:25 +02:00 committed by Chris Cranford
parent 3968b6b75f
commit 2c505c08f5
5 changed files with 118 additions and 6 deletions

View File

@ -9,13 +9,15 @@
import java.util.List;
import java.util.function.Consumer;
import io.debezium.connector.mysql.signal.KafkaSignal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.signal.ExecuteSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.KafkaSignal;
import io.debezium.connector.mysql.signal.KafkaSignalThread;
import io.debezium.connector.mysql.signal.PauseSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.ResumeSnapshotKafkaSignal;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
@ -160,6 +162,14 @@ public void enqueueDataCollectionNamesToSnapshot(List<String> dataCollectionIds,
getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(dataCollectionIds, signalOffset));
}
public void enqueuePauseSnapshot() {
getContext().enqueueKafkaSignal(new PauseSnapshotKafkaSignal());
}
public void enqueueResumeSnapshot() {
getContext().enqueueKafkaSignal(new ResumeSnapshotKafkaSignal());
}
@Override
public void processTransactionStartedEvent(MySqlPartition partition, OffsetContext offsetContext) throws InterruptedException {
if (getContext() == null) {
@ -241,6 +251,15 @@ private void checkEnqueuedSnapshotSignals(MySqlPartition partition, OffsetContex
if (signal instanceof ExecuteSnapshotKafkaSignal) {
addDataCollectionNamesToSnapshot((ExecuteSnapshotKafkaSignal) signal, partition, offsetContext);
}
else if (signal instanceof PauseSnapshotKafkaSignal) {
pauseSnapshot(partition, offsetContext);
}
else if (signal instanceof ResumeSnapshotKafkaSignal) {
resumeSnapshot(partition, offsetContext);
}
else {
throw new IllegalArgumentException("Unknown Kafka signal " + signal);
}
}
}

View File

@ -29,6 +29,8 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.ExecuteSnapshot;
import io.debezium.pipeline.signal.PauseIncrementalSnapshot;
import io.debezium.pipeline.signal.ResumeIncrementalSnapshot;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
@ -147,11 +149,18 @@ private void processSignal(ConsumerRecord<String, String> record) throws IOExcep
: DocumentReader.defaultReader().read(value);
String type = jsonData.getString("type");
Document data = jsonData.getDocument("data");
if (ExecuteSnapshot.NAME.equals(type)) {
executeSnapshot(data, record.offset());
}
else {
LOGGER.warn("Unknown signal type {}", type);
switch (type) {
case ExecuteSnapshot.NAME:
executeSnapshot(data, record.offset());
break;
case PauseIncrementalSnapshot.NAME:
executePause(data);
break;
case ResumeIncrementalSnapshot.NAME:
executeResume(data);
break;
default:
LOGGER.warn("Unknown signal type {}", type);
}
}
@ -166,6 +175,22 @@ private void executeSnapshot(Document data, long signalOffset) {
}
}
private void executePause(Document data) {
PauseIncrementalSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(data);
LOGGER.info("Requested snapshot pause");
if (snapshotType == PauseIncrementalSnapshot.SnapshotType.INCREMENTAL) {
eventSource.enqueuePauseSnapshot();
}
}
private void executeResume(Document data) {
ResumeIncrementalSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(data);
LOGGER.info("Requested snapshot resume");
if (snapshotType == ResumeIncrementalSnapshot.SnapshotType.INCREMENTAL) {
eventSource.enqueueResumeSnapshot();
}
}
public void seek(long signalOffset) {
signalsConsumer.seek(new TopicPartition(topicName, 0), signalOffset + 1);
}

View File

@ -0,0 +1,9 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.signal;
public class PauseSnapshotKafkaSignal implements KafkaSignal {
}

View File

@ -0,0 +1,9 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.signal;
public class ResumeSnapshotKafkaSignal implements KafkaSignal {
}

View File

@ -7,8 +7,11 @@
import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -17,6 +20,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.AfterClass;
@ -26,6 +30,7 @@
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs;
@ -94,6 +99,18 @@ protected void sendExecuteSnapshotKafkaSignal(String fullTableNames) throws Exec
String signalValue = String.format(
"{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}",
fullTableNames);
sendKafkaSignal(signalValue);
}
protected void sendPauseSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
sendKafkaSignal("{\"type\":\"pause-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
}
protected void sendResumeSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
sendKafkaSignal("{\"type\":\"resume-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
}
protected void sendKafkaSignal(String signalValue) throws ExecutionException, InterruptedException {
final ProducerRecord<String, String> executeSnapshotSignal = new ProducerRecord<>(getSignalsTopic(), PARTITION_NO, SERVER_NAME, signalValue);
final Configuration signalProducerConfig = Configuration.create()
@ -221,6 +238,39 @@ public void shouldFailIfGtidModeIsOff() throws Exception {
}
}
@Test
public void testPauseDuringSnapshotKafkaSignal() throws Exception {
populateTable();
startConnector(x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendExecuteSnapshotKafkaSignal();
List<SourceRecord> records = new ArrayList<>();
String topicName = topicName();
Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(100);
sendPauseSnapshotKafkaSignal();
consumeAvailableRecords(record -> {
if (topicName.equalsIgnoreCase(record.topic())) {
records.add(record);
}
});
int beforeResume = records.size() + dbChanges.size();
sendResumeSnapshotKafkaSignal();
dbChanges = consumeMixedWithIncrementalSnapshot(ROW_COUNT - beforeResume);
for (int i = beforeResume + 1; i < ROW_COUNT; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "a4");