DBZ-3500 Re-use ResultSet to Object code

This commit is contained in:
Jiri Pechanec 2021-05-07 09:56:28 +02:00 committed by Gunnar Morling
parent a626dc149f
commit a879b7b3e1
4 changed files with 27 additions and 22 deletions

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.postgresql;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Optional;
@ -23,7 +22,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -219,8 +218,8 @@ protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotC
}
@Override
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
return jdbcConnection.getColumnValue(rs, columnIndex, column, table, schema);
protected RelationalDatabaseSchema schema() {
return schema;
}
protected void setSnapshotTransactionIsolationLevel() throws SQLException {

View File

@ -61,6 +61,7 @@
import io.debezium.util.BoundedConcurrentHashMap.Eviction;
import io.debezium.util.BoundedConcurrentHashMap.EvictionListener;
import io.debezium.util.Collect;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
/**
@ -1458,4 +1459,18 @@ public <T extends DatabaseSchema<TableId>> Object getColumnValue(ResultSet rs, i
throws SQLException {
return rs.getObject(columnIndex);
}
/**
* Converts a {@link ResultSet} row to an array of Objects
*/
public <T extends DatabaseSchema<TableId>> Object[] rowToArray(Table table, T databaseSchema, ResultSet rs,
ColumnUtils.ColumnArray columnArray)
throws SQLException {
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
for (int i = 0; i < columnArray.getColumns().length; i++) {
row[columnArray.getColumns()[i].position() - 1] = getColumnValue(rs, i + 1,
columnArray.getColumns()[i], table, databaseSchema);
}
return row;
}
}

View File

@ -49,7 +49,6 @@ public class SignalBasedIncrementalSnapshotChangeEventSource<T extends DataColle
private static final Logger LOGGER = LoggerFactory.getLogger(SignalBasedIncrementalSnapshotChangeEventSource.class);
// List needs to be used as key as it implements hashCode/equals contract
private Map<Struct, Object[]> window = new LinkedHashMap<>();
private CommonConnectorConfig connectorConfig;
private JdbcConnection jdbcConnection;
@ -222,7 +221,8 @@ private void readChunk() throws InterruptedException {
if (!rs.next()) {
return null;
}
return keyFromRow(rowToArray(currentTable, rs, ColumnUtils.toArray(rs, currentTable)));
return keyFromRow(jdbcConnection.rowToArray(currentTable, databaseSchema, rs,
ColumnUtils.toArray(rs, currentTable)));
}));
if (!context.maximumKey().isPresent()) {
LOGGER.info(
@ -308,7 +308,7 @@ private void createDataEventsForTable() throws InterruptedException {
Object[] firstRow = null;
while (rs.next()) {
rows++;
final Object[] row = rowToArray(currentTable, rs, columnArray);
final Object[] row = jdbcConnection.rowToArray(currentTable, databaseSchema, rs, columnArray);
if (firstRow == null) {
firstRow = row;
}
@ -349,16 +349,6 @@ private void tableScanCompleted() {
totalRowsScanned = 0;
}
// Extract to JdbcConnection, same as in RelationalSnapshotChangeEventSource
protected Object[] rowToArray(Table table, ResultSet rs, ColumnUtils.ColumnArray columnArray) throws SQLException {
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
for (int i = 0; i < columnArray.getColumns().length; i++) {
row[columnArray.getColumns()[i].position() - 1] = jdbcConnection.getColumnValue(rs, i + 1,
columnArray.getColumns()[i], table, databaseSchema);
}
return row;
}
protected PreparedStatement readTableChunkStatement(String sql) throws SQLException {
final PreparedStatement statement = jdbcConnection.readTablePreparedStatement(connectorConfig, sql,
OptionalLong.empty());

View File

@ -351,10 +351,7 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext, Re
}
rows++;
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
for (int i = 0; i < columnArray.getColumns().length; i++) {
row[columnArray.getColumns()[i].position() - 1] = getColumnValue(rs, i + 1, columnArray.getColumns()[i], table);
}
final Object[] row = jdbcConnection.rowToArray(table, schema(), rs, columnArray);
snapshotContext.lastRecordInTable = !rs.next();
if (logTimer.expired()) {
@ -450,8 +447,12 @@ protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotConte
// scn xyz")
protected abstract Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId);
protected RelationalDatabaseSchema schema() {
return schema;
}
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
return jdbcConnection.getColumnValue(rs, columnIndex, column, table, schema);
return jdbcConnection.getColumnValue(rs, columnIndex, column, table, schema());
}
/**