DBZ-3473 Support for cursor-based fetching

This commit is contained in:
Jiri Pechanec 2021-05-05 09:26:30 +02:00
parent c53785dde7
commit 3d940ca14a
4 changed files with 29 additions and 80 deletions

View File

@ -7,6 +7,7 @@
package io.debezium.connector.mysql;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
@ -28,8 +29,11 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Strings;
/**
@ -50,20 +54,27 @@ public class MySqlConnection extends JdbcConnection {
private final Map<String, String> originalSystemProperties = new HashMap<>();
private final MySqlConnectionConfiguration connectionConfig;
private final MysqlFieldReader mysqlFieldReader;
/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
* @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
* @param config
* {@link Configuration} instance, may not be null.
* @param valueConverters
* {@link SqlServerValueConverters} instance
* @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null.
* @param fieldReader binary or text protocol based readers
*/
public MySqlConnection(MySqlConnectionConfiguration connectionConfig) {
public MySqlConnection(MySqlConnectionConfiguration connectionConfig, MysqlFieldReader fieldReader) {
super(connectionConfig.config(), connectionConfig.factory());
this.connectionConfig = connectionConfig;
this.mysqlFieldReader = fieldReader;
}
/**
* Creates a new connection using the supplied configuration.
*
* @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null.
*/
public MySqlConnection(MySqlConnectionConfiguration connectionConfig) {
this(connectionConfig, new MysqlTextProtocolFieldReader());
}
@Override
@ -552,4 +563,11 @@ public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() {
return EventProcessingFailureHandlingMode.parse(mode);
}
}
@Override
public <T extends DatabaseSchema<TableId>> Object getColumnValue(ResultSet rs, int columnIndex, Column column,
Table table, T schema)
throws SQLException {
return mysqlFieldReader.readField(rs, columnIndex, column, table);
}
}

View File

@ -77,7 +77,9 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.withDefault("database.useCursorFetch", connectorConfig.useCursorFetch())
.build();
connection = new MySqlConnection(new MySqlConnectionConfiguration(config));
connection = new MySqlConnection(new MySqlConnectionConfiguration(config),
connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader()
: new MysqlTextProtocolFieldReader());
try {
connection.setAutoCommit(false);
}

View File

@ -5,14 +5,11 @@
*/
package io.debezium.connector.mysql;
import java.io.UnsupportedEncodingException;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -38,7 +35,6 @@
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
@ -64,7 +60,6 @@ public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEven
private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
private final BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor;
private final MysqlFieldReader mysqlFieldReader;
public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext previousOffset, MySqlConnection connection,
MySqlDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
@ -78,7 +73,6 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MySq
this.previousOffset = previousOffset;
this.databaseSchema = schema;
this.lastEventProcessor = lastEventProcessor;
this.mysqlFieldReader = connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader() : new MysqlTextProtocolFieldReader();
}
@Override
@ -403,71 +397,6 @@ protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotC
return Optional.of(String.format("SELECT * FROM `%s`.`%s`", tableId.catalog(), tableId.table()));
}
@Override
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
// TODO Move to connection to support cursor fetch for incremental snapshot too
return mysqlFieldReader.readField(rs, columnIndex, column, table);
}
/**
* As MySQL connector/J implementation is broken for MySQL type "TIME" we have to use a binary-ish workaround
*
* @see https://issues.jboss.org/browse/DBZ-342
*/
private Object readTimeField(ResultSet rs, int fieldNo) throws SQLException {
Blob b = rs.getBlob(fieldNo);
if (b == null) {
return null; // Don't continue parsing time field if it is null
}
try {
return MySqlValueConverters.stringToDuration(new String(b.getBytes(1, (int) (b.length())), "UTF-8"));
}
catch (UnsupportedEncodingException e) {
LOGGER.error("Could not read MySQL TIME value as UTF-8");
throw new RuntimeException(e);
}
}
/**
* In non-string mode the date field can contain zero in any of the date part which we need to handle as all-zero
*
*/
private Object readDateField(ResultSet rs, int fieldNo, Column column, Table table) throws SQLException {
Blob b = rs.getBlob(fieldNo);
if (b == null) {
return null; // Don't continue parsing date field if it is null
}
try {
return MySqlValueConverters.stringToLocalDate(new String(b.getBytes(1, (int) (b.length())), "UTF-8"), column, table);
}
catch (UnsupportedEncodingException e) {
LOGGER.error("Could not read MySQL TIME value as UTF-8");
throw new RuntimeException(e);
}
}
/**
* In non-string mode the time field can contain zero in any of the date part which we need to handle as all-zero
*
*/
private Object readTimestampField(ResultSet rs, int fieldNo, Column column, Table table) throws SQLException {
Blob b = rs.getBlob(fieldNo);
if (b == null) {
return null; // Don't continue parsing timestamp field if it is null
}
try {
return MySqlValueConverters.containsZeroValuesInDatePart((new String(b.getBytes(1, (int) (b.length())), "UTF-8")), column, table) ? null
: rs.getTimestamp(fieldNo, Calendar.getInstance());
}
catch (UnsupportedEncodingException e) {
LOGGER.error("Could not read MySQL TIME value as UTF-8");
throw new RuntimeException(e);
}
}
private boolean isGloballyLocked() {
return globalLockAcquiredAt != -1;
}

View File

@ -451,7 +451,7 @@ protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotConte
protected abstract Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId);
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
return getColumnValue(rs, columnIndex, column);
return jdbcConnection.getColumnValue(rs, columnIndex, column, table, schema);
}
@Deprecated