DBZ-6566 Support Blocking snapshot for SQLServer
This commit is contained in:
parent
def7379a0d
commit
f883a5c9a3
@ -104,6 +104,12 @@
|
|||||||
<groupId>io.apicurio</groupId>
|
<groupId>io.apicurio</groupId>
|
||||||
<artifactId>apicurio-registry-utils-converter</artifactId>
|
<artifactId>apicurio-registry-utils-converter</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-jboss-logmanager</artifactId>
|
||||||
|
<groupId>org.jboss.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
@ -109,9 +109,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
|
|||||||
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
|
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
|
||||||
|
|
||||||
if (context.isRunning()) {
|
if (context.isRunning()) {
|
||||||
if (streamingSource.executeIteration(context, partition, previousOffset)) {
|
streamedEvents = streamingSource.executeIteration(context, partition, previousOffset);
|
||||||
streamedEvents = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,6 +120,14 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
|
|||||||
if (errorHandler.getProducerThrowable() == null) {
|
if (errorHandler.getProducerThrowable() == null) {
|
||||||
firstStreamingIterationCompletedSuccessfully.set(true);
|
firstStreamingIterationCompletedSuccessfully.set(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (context.isPaused()) {
|
||||||
|
LOGGER.info("Streaming will now pause");
|
||||||
|
context.streamingPaused();
|
||||||
|
context.waitSnapshotCompletion();
|
||||||
|
LOGGER.info("Streaming resumed");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.info("Finished streaming");
|
LOGGER.info("Finished streaming");
|
||||||
|
@ -63,7 +63,7 @@ protected SnapshottingTask getSnapshottingTask(SqlServerPartition partition, Sql
|
|||||||
boolean snapshotData = true;
|
boolean snapshotData = true;
|
||||||
|
|
||||||
// found a previous offset and the earlier snapshot has completed
|
// found a previous offset and the earlier snapshot has completed
|
||||||
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
|
if (previousOffset != null && !previousOffset.isSnapshotRunning() && false /* TODO check if streaming is pause */) {
|
||||||
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
|
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
|
||||||
snapshotSchema = false;
|
snapshotSchema = false;
|
||||||
snapshotData = false;
|
snapshotData = false;
|
||||||
|
@ -0,0 +1,142 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.debezium.connector.sqlserver;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
|
||||||
|
import io.debezium.relational.history.SchemaHistory;
|
||||||
|
import io.debezium.util.Testing;
|
||||||
|
|
||||||
|
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
|
||||||
|
|
||||||
|
private static final int POLLING_INTERVAL = 1;
|
||||||
|
|
||||||
|
private SqlServerConnection connection;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws SQLException {
|
||||||
|
TestHelper.createTestDatabase();
|
||||||
|
connection = TestHelper.testConnection();
|
||||||
|
connection.execute(
|
||||||
|
"CREATE TABLE a (pk int primary key, aa int)",
|
||||||
|
"CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");
|
||||||
|
TestHelper.enableTableCdc(connection, "debezium_signal");
|
||||||
|
TestHelper.adjustCdcPollingInterval(connection, POLLING_INTERVAL);
|
||||||
|
|
||||||
|
initializeConnectorTestFramework();
|
||||||
|
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws SQLException {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void populateTable() throws SQLException {
|
||||||
|
super.populateTable();
|
||||||
|
TestHelper.enableTableCdc(connection, "a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Class<SqlServerConnector> connectorClass() {
|
||||||
|
return SqlServerConnector.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected JdbcConnection databaseConnection() {
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String topicName() {
|
||||||
|
return "server1.testDB1.dbo.a";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<String> topicNames() {
|
||||||
|
return List.of("server1.testDB1.dbo.a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String tableName() {
|
||||||
|
return "testDB1.dbo.a";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<String> tableNames() {
|
||||||
|
return List.of("testDB1.dbo.a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String signalTableName() {
|
||||||
|
return "dbo.debezium_signal";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration.Builder config() {
|
||||||
|
return TestHelper.defaultConfig()
|
||||||
|
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)
|
||||||
|
.with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
|
||||||
|
final String tableIncludeList;
|
||||||
|
if (signalTableOnly) {
|
||||||
|
tableIncludeList = "dbo.b";
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
tableIncludeList = "dbo.a," + signalTableName();
|
||||||
|
}
|
||||||
|
return TestHelper.defaultConfig()
|
||||||
|
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)
|
||||||
|
.with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal")
|
||||||
|
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
|
||||||
|
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
|
||||||
|
TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String connector() {
|
||||||
|
return "sql_server";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String server() {
|
||||||
|
return TestHelper.TEST_SERVER_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String task() {
|
||||||
|
return "0";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String database() {
|
||||||
|
return TestHelper.TEST_DATABASE_1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int insertMaxSleep() {
|
||||||
|
return 100;
|
||||||
|
}
|
||||||
|
}
|
@ -39,7 +39,6 @@
|
|||||||
import io.debezium.junit.logging.LogInterceptor;
|
import io.debezium.junit.logging.LogInterceptor;
|
||||||
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
|
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
|
||||||
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
|
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
|
||||||
import io.debezium.util.Testing;
|
|
||||||
|
|
||||||
public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
|
public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
|
||||||
private int signalingRecords;
|
private int signalingRecords;
|
||||||
@ -123,10 +122,14 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
|
|||||||
AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords));
|
AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected int insertMaxSleep() {
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
private Runnable insertTask() {
|
private Runnable insertTask() {
|
||||||
return () -> {
|
return () -> {
|
||||||
try {
|
try {
|
||||||
insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, 2);
|
insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, insertMaxSleep());
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
@ -155,8 +158,8 @@ private static List<Integer> getExpectedValues(Long totalSnapshotRecords) {
|
|||||||
List<Integer> initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
|
List<Integer> initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
|
||||||
List<Integer> firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList());
|
List<Integer> firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList());
|
||||||
List<Integer> blockingSnapshotValues = Stream.of(
|
List<Integer> blockingSnapshotValues = Stream.of(
|
||||||
initialSnapShotValues,
|
initialSnapShotValues,
|
||||||
IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(List::stream)
|
IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(List::stream)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
List<Integer> secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList());
|
List<Integer> secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList());
|
||||||
return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(List::stream)
|
return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(List::stream)
|
||||||
@ -225,9 +228,8 @@ private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int ma
|
|||||||
int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep);
|
int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep);
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
}
|
}
|
||||||
Testing.debug(String.format("Insert of %s records completed", rowCount));
|
|
||||||
}
|
}
|
||||||
catch (InterruptedException e) {
|
catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user