DBZ-137 Introduce streaming.fetch.size for SQL Server.

This commit is contained in:
Chris Cranford 2020-08-25 13:15:42 -04:00 committed by Gunnar Morling
parent 72d59348ed
commit 3f78f40fb4
3 changed files with 17 additions and 2 deletions

View File

@ -27,6 +27,7 @@
import com.microsoft.sqlserver.jdbc.SQLServerDriver;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
@ -208,7 +209,7 @@ public void getChangesForTables(SqlServerChangeTable[] changeTables, Lsn interva
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
String fetchSizeStr = config().asProperties().getProperty("incremental.fetch.size");
String fetchSizeStr = config().asProperties().getProperty(SqlServerConnectorConfig.STREAMING_FETCH_SIZE.name());
if (fetchSizeStr != null && fetchSizeStr.trim().length() > 0) {
statement.setFetchSize(Integer.parseInt(fetchSizeStr));
}

View File

@ -328,6 +328,14 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.withDescription(
"If true, locks all tables to be captured as we build the schema snapshot. This will prevent from any concurrent schema changes being applied to them.");
public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size")
.withDisplayName("Streaming fetch size")
.withType(Type.INT)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The maximum number of records that should be loaded into memory while streaming")
.withValidation(Field::isNonNegativeInteger);
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
@ -341,7 +349,8 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
SNAPSHOT_MODE,
SNAPSHOT_ISOLATION_MODE,
SOURCE_TIMESTAMP_MODE,
SNAPSHOT_SKIP_LOCKS)
SNAPSHOT_SKIP_LOCKS,
STREAMING_FETCH_SIZE)
.excluding(
SCHEMA_WHITELIST,
SCHEMA_INCLUDE_LIST,

View File

@ -1899,6 +1899,11 @@ Can be used to avoid snapshot interruptions when starting multiple connectors in
|Specifies the maximum number of rows that should be read in one go from each table while taking a snapshot.
The connector will read the table contents in multiple batches of this size. Defaults to 2000.
|[[sqlserver-property-streaming-fetch-size]]<<sqlserver-property-streaming-fetch-size, `streaming.fetch.size`>>
|
|Specifies the maximum number of rows that should be read in one go from each change table while streaming.
The connector will read the change table contents in multiple batches of this size. Defaults to the JDBC driver's default fetch size.
|[[sqlserver-property-snapshot-lock-timeout-ms]]<<sqlserver-property-snapshot-lock-timeout-ms, `snapshot.lock.timeout.ms`>>
|`10000`
|An integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail (also see {link-prefix}:{link-sqlserver-connector}#sqlserver-snapshots[snapshots]). +