diff --git a/debezium-connector-sqlserver/pom.xml b/debezium-connector-sqlserver/pom.xml
index d6c9c9373..d8af5cc95 100644
--- a/debezium-connector-sqlserver/pom.xml
+++ b/debezium-connector-sqlserver/pom.xml
@@ -104,6 +104,12 @@
io.apicurio
apicurio-registry-utils-converter
test
+
+
+ slf4j-jboss-logmanager
+ org.jboss.slf4j
+
+
diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java
index 15115c1e1..e6137735d 100644
--- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java
+++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java
@@ -109,9 +109,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
if (context.isRunning()) {
- if (streamingSource.executeIteration(context, partition, previousOffset)) {
- streamedEvents = true;
- }
+ streamedEvents = streamingSource.executeIteration(context, partition, previousOffset);
}
}
@@ -122,6 +120,14 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
if (errorHandler.getProducerThrowable() == null) {
firstStreamingIterationCompletedSuccessfully.set(true);
}
+
+ if (context.isPaused()) {
+ LOGGER.info("Streaming will now pause");
+ context.streamingPaused();
+ context.waitSnapshotCompletion();
+ LOGGER.info("Streaming resumed");
+ }
+
}
LOGGER.info("Finished streaming");
diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java
index c74272d28..06c7798b7 100644
--- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java
+++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java
@@ -63,7 +63,7 @@ protected SnapshottingTask getSnapshottingTask(SqlServerPartition partition, Sql
boolean snapshotData = true;
// found a previous offset and the earlier snapshot has completed
- if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
+ if (previousOffset != null && !previousOffset.isSnapshotRunning() && false /* TODO check if streaming is pause */) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
snapshotSchema = false;
snapshotData = false;
diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java
new file mode 100644
index 000000000..2a439e557
--- /dev/null
+++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.sqlserver;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.sqlserver.util.TestHelper;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.pipeline.AbstractBlockingSnapshotTest;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.util.Testing;
+
+public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
+
+ private static final int POLLING_INTERVAL = 1;
+
+ private SqlServerConnection connection;
+
+ @Before
+ public void before() throws SQLException {
+ TestHelper.createTestDatabase();
+ connection = TestHelper.testConnection();
+ connection.execute(
+ "CREATE TABLE a (pk int primary key, aa int)",
+ "CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");
+ TestHelper.enableTableCdc(connection, "debezium_signal");
+ TestHelper.adjustCdcPollingInterval(connection, POLLING_INTERVAL);
+
+ initializeConnectorTestFramework();
+ Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
+ }
+
+ @After
+ public void after() throws SQLException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ @Override
+ protected void populateTable() throws SQLException {
+ super.populateTable();
+ TestHelper.enableTableCdc(connection, "a");
+ }
+
+ @Override
+ protected Class connectorClass() {
+ return SqlServerConnector.class;
+ }
+
+ @Override
+ protected JdbcConnection databaseConnection() {
+ return connection;
+ }
+
+ @Override
+ protected String topicName() {
+ return "server1.testDB1.dbo.a";
+ }
+
+ @Override
+ protected List topicNames() {
+ return List.of("server1.testDB1.dbo.a");
+ }
+
+ @Override
+ protected String tableName() {
+ return "testDB1.dbo.a";
+ }
+
+ @Override
+ protected List tableNames() {
+ return List.of("testDB1.dbo.a");
+ }
+
+ @Override
+ protected String signalTableName() {
+ return "dbo.debezium_signal";
+ }
+
+ @Override
+ protected Configuration.Builder config() {
+ return TestHelper.defaultConfig()
+ .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)
+ .with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal");
+ }
+
+ @Override
+ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
+ final String tableIncludeList;
+ if (signalTableOnly) {
+ tableIncludeList = "dbo.b";
+ }
+ else {
+ tableIncludeList = "dbo.a," + signalTableName();
+ }
+ return TestHelper.defaultConfig()
+ .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)
+ .with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal")
+ .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
+ .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
+ }
+
+ @Override
+ protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
+ TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions);
+ }
+
+ @Override
+ protected String connector() {
+ return "sql_server";
+ }
+
+ @Override
+ protected String server() {
+ return TestHelper.TEST_SERVER_NAME;
+ }
+
+ @Override
+ protected String task() {
+ return "0";
+ }
+
+ @Override
+ protected String database() {
+ return TestHelper.TEST_DATABASE_1;
+ }
+
+ @Override
+ protected int insertMaxSleep() {
+ return 100;
+ }
+}
diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java
index 0f4be1771..ae2eefde0 100644
--- a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java
+++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java
@@ -39,7 +39,6 @@
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
-import io.debezium.util.Testing;
public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
private int signalingRecords;
@@ -123,10 +122,14 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords));
}
+ protected int insertMaxSleep() {
+ return 2;
+ }
+
private Runnable insertTask() {
return () -> {
try {
- insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, 2);
+ insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, insertMaxSleep());
}
catch (Exception e) {
throw new RuntimeException(e);
@@ -155,8 +158,8 @@ private static List getExpectedValues(Long totalSnapshotRecords) {
List initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
List firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList());
List blockingSnapshotValues = Stream.of(
- initialSnapShotValues,
- IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(List::stream)
+ initialSnapShotValues,
+ IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(List::stream)
.collect(Collectors.toList());
List secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList());
return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(List::stream)
@@ -225,9 +228,8 @@ private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int ma
int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep);
Thread.sleep(sleepTime);
}
- Testing.debug(String.format("Insert of %s records completed", rowCount));
}
- catch (InterruptedException e) {
+ catch (Exception e) {
throw new RuntimeException(e);
}
}