DBZ-3500 Incremental snapshots for SQL Server

This commit is contained in:
Jiri Pechanec 2021-05-07 11:10:43 +02:00 committed by Gunnar Morling
parent 258757410d
commit 90738f53a6
5 changed files with 158 additions and 9 deletions

View File

@ -5,14 +5,20 @@
*/
package io.debezium.connector.sqlserver;
import java.util.Optional;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory {
@ -54,4 +60,19 @@ public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext of
clock,
schema);
}
@Override
public Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(
OffsetContext offsetContext,
SnapshotProgressListener snapshotProgressListener,
DataChangeEventListener dataChangeEventListener) {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<TableId>(
configuration,
dataConnection,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener);
return Optional.of(incrementalSnapshotChangeEventSource);
}
}

View File

@ -503,4 +503,30 @@ public <T extends DatabaseSchema<TableId>> Object getColumnValue(ResultSet rs, i
return super.getColumnValue(rs, columnIndex, column, table, schema);
}
}
@Override
public String buildSelectWithRowLimits(TableId tableId, int limit, String projection, Optional<String> condition,
String orderBy) {
final StringBuilder sql = new StringBuilder("SELECT TOP ");
sql
.append(limit)
.append(' ')
.append(projection)
.append(" FROM ");
sql.append(quotedTableIdString(tableId));
if (condition.isPresent()) {
sql
.append(" WHERE ")
.append(condition.get());
}
sql
.append(" ORDER BY ")
.append(orderBy);
return sql.toString();
}
@Override
public String quotedTableIdString(TableId tableId) {
return "[" + tableId.schema() + "].[" + tableId.table() + "]";
}
}

View File

@ -13,6 +13,7 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
@ -29,14 +30,16 @@ public class SqlServerOffsetContext implements OffsetContext {
private final Map<String, String> partition;
private boolean snapshotCompleted;
private final TransactionContext transactionContext;
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
/**
* The index of the current event within the current transaction.
*/
private long eventSerialNo;
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo,
TransactionContext transactionContext) {
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot,
boolean snapshotCompleted, long eventSerialNo, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
sourceInfo = new SourceInfo(connectorConfig);
@ -53,10 +56,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
}
this.eventSerialNo = eventSerialNo;
this.transactionContext = transactionContext;
this.incrementalSnapshotContext = incrementalSnapshotContext;
}
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) {
this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext());
this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext(), new IncrementalSnapshotContext<>());
}
@Override
@ -73,11 +77,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString());
}
else {
return transactionContext.store(Collect.hashMapOf(
return incrementalSnapshotContext.store(transactionContext.store(Collect.hashMapOf(
SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString(),
SourceInfo.CHANGE_LSN_KEY,
sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString(),
SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo));
SourceInfo.EVENT_SERIAL_NO_KEY, eventSerialNo)));
}
}
@ -163,7 +167,7 @@ public OffsetContext load(Map<String, ?> offset) {
}
return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo,
TransactionContext.load(offset));
TransactionContext.load(offset), IncrementalSnapshotContext.load(offset, TableId.class));
}
}
@ -193,4 +197,14 @@ public void event(DataCollectionId tableId, Instant timestamp) {
public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.util.Testing;
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<SqlServerConnector> {
private SqlServerConnection connection;
@Rule
public SkipTestRule skipRule = new SkipTestRule();
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE a (pk int primary key, aa int)",
"CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");
TestHelper.enableTableCdc(connection, "debezium_signal");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Override
protected void populateTable() throws SQLException {
super.populateTable();
TestHelper.enableTableCdc(connection, "a");
}
@Override
protected Class<SqlServerConnector> connectorClass() {
return SqlServerConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected String topicName() {
return "server1.dbo.a";
}
@Override
protected String tableName() {
return "testDB.dbo.a";
}
@Override
protected String signalTableName() {
return "dbo.debezium_signal";
}
@Override
protected Builder config() {
return TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB.dbo.debezium_signal");
}
}

View File

@ -53,7 +53,7 @@ public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector>
protected void populateTable(JdbcConnection connection) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (aa) VALUES (%s)", tableName(), i));
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName(), i + 1, i));
}
connection.commit();
}
@ -160,8 +160,10 @@ public void inserts() throws Exception {
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(
String.format("INSERT INTO %s (aa) VALUES (%s)", tableName(), i + ROW_COUNT));
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)",
tableName(),
i + ROW_COUNT + 1,
i + ROW_COUNT));
}
connection.commit();
}