diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java index 1e8c48b59..3bc6ec148 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java @@ -9,13 +9,15 @@ import java.util.List; import java.util.function.Consumer; -import io.debezium.connector.mysql.signal.KafkaSignal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; import io.debezium.connector.mysql.signal.ExecuteSnapshotKafkaSignal; +import io.debezium.connector.mysql.signal.KafkaSignal; import io.debezium.connector.mysql.signal.KafkaSignalThread; +import io.debezium.connector.mysql.signal.PauseSnapshotKafkaSignal; +import io.debezium.connector.mysql.signal.ResumeSnapshotKafkaSignal; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource; @@ -160,6 +162,14 @@ public void enqueueDataCollectionNamesToSnapshot(List dataCollectionIds, getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(dataCollectionIds, signalOffset)); } + public void enqueuePauseSnapshot() { + getContext().enqueueKafkaSignal(new PauseSnapshotKafkaSignal()); + } + + public void enqueueResumeSnapshot() { + getContext().enqueueKafkaSignal(new ResumeSnapshotKafkaSignal()); + } + @Override public void processTransactionStartedEvent(MySqlPartition partition, OffsetContext offsetContext) throws InterruptedException { if (getContext() == null) { @@ -241,6 +251,15 @@ private void checkEnqueuedSnapshotSignals(MySqlPartition partition, OffsetContex if (signal instanceof ExecuteSnapshotKafkaSignal) { addDataCollectionNamesToSnapshot((ExecuteSnapshotKafkaSignal) signal, partition, offsetContext); } + else if (signal instanceof PauseSnapshotKafkaSignal) { + pauseSnapshot(partition, offsetContext); + } + else if (signal instanceof ResumeSnapshotKafkaSignal) { + resumeSnapshot(partition, offsetContext); + } + else { + throw new IllegalArgumentException("Unknown Kafka signal " + signal); + } } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java index d3341b240..c0d878e77 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java @@ -29,6 +29,8 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.pipeline.signal.ExecuteSnapshot; +import io.debezium.pipeline.signal.PauseIncrementalSnapshot; +import io.debezium.pipeline.signal.ResumeIncrementalSnapshot; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Collect; import io.debezium.util.Threads; @@ -147,11 +149,18 @@ private void processSignal(ConsumerRecord record) throws IOExcep : DocumentReader.defaultReader().read(value); String type = jsonData.getString("type"); Document data = jsonData.getDocument("data"); - if (ExecuteSnapshot.NAME.equals(type)) { - executeSnapshot(data, record.offset()); - } - else { - LOGGER.warn("Unknown signal type {}", type); + switch (type) { + case ExecuteSnapshot.NAME: + executeSnapshot(data, record.offset()); + break; + case PauseIncrementalSnapshot.NAME: + executePause(data); + break; + case ResumeIncrementalSnapshot.NAME: + executeResume(data); + break; + default: + LOGGER.warn("Unknown signal type {}", type); } } @@ -166,6 +175,22 @@ private void executeSnapshot(Document data, long signalOffset) { } } + private void executePause(Document data) { + PauseIncrementalSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(data); + LOGGER.info("Requested snapshot pause"); + if (snapshotType == PauseIncrementalSnapshot.SnapshotType.INCREMENTAL) { + eventSource.enqueuePauseSnapshot(); + } + } + + private void executeResume(Document data) { + ResumeIncrementalSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(data); + LOGGER.info("Requested snapshot resume"); + if (snapshotType == ResumeIncrementalSnapshot.SnapshotType.INCREMENTAL) { + eventSource.enqueueResumeSnapshot(); + } + } + public void seek(long signalOffset) { signalsConsumer.seek(new TopicPartition(topicName, 0), signalOffset + 1); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/PauseSnapshotKafkaSignal.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/PauseSnapshotKafkaSignal.java new file mode 100644 index 000000000..4e7540f85 --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/PauseSnapshotKafkaSignal.java @@ -0,0 +1,9 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql.signal; + +public class PauseSnapshotKafkaSignal implements KafkaSignal { +} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ResumeSnapshotKafkaSignal.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ResumeSnapshotKafkaSignal.java new file mode 100644 index 000000000..b41b657c8 --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ResumeSnapshotKafkaSignal.java @@ -0,0 +1,9 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql.signal; + +public class ResumeSnapshotKafkaSignal implements KafkaSignal { +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java index df8e3c4b8..23d9a6903 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java @@ -7,8 +7,11 @@ import java.io.File; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.producer.KafkaProducer; @@ -17,6 +20,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; import org.fest.assertions.Assertions; import org.fest.assertions.MapAssert; import org.junit.AfterClass; @@ -26,6 +30,7 @@ import org.junit.Test; import org.junit.rules.TestRule; +import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule; import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs; @@ -94,6 +99,18 @@ protected void sendExecuteSnapshotKafkaSignal(String fullTableNames) throws Exec String signalValue = String.format( "{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", fullTableNames); + sendKafkaSignal(signalValue); + } + + protected void sendPauseSnapshotKafkaSignal() throws ExecutionException, InterruptedException { + sendKafkaSignal("{\"type\":\"pause-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}"); + } + + protected void sendResumeSnapshotKafkaSignal() throws ExecutionException, InterruptedException { + sendKafkaSignal("{\"type\":\"resume-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}"); + } + + protected void sendKafkaSignal(String signalValue) throws ExecutionException, InterruptedException { final ProducerRecord executeSnapshotSignal = new ProducerRecord<>(getSignalsTopic(), PARTITION_NO, SERVER_NAME, signalValue); final Configuration signalProducerConfig = Configuration.create() @@ -221,6 +238,39 @@ public void shouldFailIfGtidModeIsOff() throws Exception { } } + @Test + public void testPauseDuringSnapshotKafkaSignal() throws Exception { + populateTable(); + startConnector(x -> x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)); + waitForConnectorToStart(); + + waitForAvailableRecords(1, TimeUnit.SECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + sendExecuteSnapshotKafkaSignal(); + + List records = new ArrayList<>(); + String topicName = topicName(); + Map dbChanges = consumeMixedWithIncrementalSnapshot(100); + + sendPauseSnapshotKafkaSignal(); + + consumeAvailableRecords(record -> { + if (topicName.equalsIgnoreCase(record.topic())) { + records.add(record); + } + }); + int beforeResume = records.size() + dbChanges.size(); + + sendResumeSnapshotKafkaSignal(); + + dbChanges = consumeMixedWithIncrementalSnapshot(ROW_COUNT - beforeResume); + for (int i = beforeResume + 1; i < ROW_COUNT; i++) { + Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i)); + } + } + protected void populate4PkTable() throws SQLException { try (final JdbcConnection connection = databaseConnection()) { populate4PkTable(connection, "a4");