DBZ-2495 Updating to handle captured_column_list without specify column.exclude.list

This commit is contained in:
James Gormley 2020-09-18 07:43:20 -04:00 committed by Gunnar Morling
parent c67c5df876
commit 0d46ec47da
12 changed files with 344 additions and 106 deletions

View File

@ -5,8 +5,12 @@
*/
package io.debezium.connector.sqlserver;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import io.debezium.relational.ChangeTable;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
/**
@ -20,6 +24,7 @@
public class SqlServerChangeTable extends ChangeTable {
private static final String CDC_SCHEMA = "cdc";
private static final Pattern BRACKET_PATTERN = Pattern.compile("[\\[\\]]");
/**
* A LSN from which the data in the change table are relevant
@ -31,19 +36,19 @@ public class SqlServerChangeTable extends ChangeTable {
*/
private Lsn stopLsn;
/**
* The table from which the changes are captured
*/
private Table sourceTable;
private List<String> capturedColumnList;
public SqlServerChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
public SqlServerChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn,
String capturedColumnListString) {
super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance), changeTableObjectId);
this.startLsn = startLsn;
this.stopLsn = stopLsn;
this.capturedColumnList = Arrays.asList(BRACKET_PATTERN.matcher(Optional.ofNullable(capturedColumnListString).orElse(""))
.replaceAll("").split(", "));
}
public SqlServerChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
this(null, captureInstance, changeTableObjectId, startLsn, stopLsn);
this(null, captureInstance, changeTableObjectId, startLsn, stopLsn, null);
}
public Lsn getStartLsn() {
@ -58,12 +63,8 @@ public void setStopLsn(Lsn stopLsn) {
this.stopLsn = stopLsn;
}
public Table getSourceTable() {
return sourceTable;
}
public void setSourceTable(Table sourceTable) {
this.sourceTable = sourceTable;
public List<String> getCapturedColumnList() {
return capturedColumnList;
}
@Override

View File

@ -12,7 +12,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;
import io.debezium.jdbc.JdbcConnection.ResultSetMapper;
import io.debezium.pipeline.source.spi.ChangeTableResultSet;
@ -48,7 +48,6 @@ public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet r
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet, int columnDataOffset) {
super(changeTable, resultSet, columnDataOffset);
// Store references to these because we can't get them from our superclass
// this.changeTable = changeTable;
this.resultSet = resultSet;
this.columnDataOffset = columnDataOffset;
}
@ -102,34 +101,26 @@ public Object[] getData() throws SQLException {
* a subset of columns
*/
private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLException {
final List<String> sourceTableColumns = table.columns()
.stream()
.map(Column::name)
.collect(Collectors.toList());
Map<String, Column> sourceTableColumns = new HashMap<>(table.columns().size());
AtomicInteger greatestColumnPosition = new AtomicInteger(0);
for (Column column : table.columns()) {
sourceTableColumns.put(column.name(), column);
greatestColumnPosition.set(greatestColumnPosition.get() < column.position()
? column.position()
: greatestColumnPosition.get());
}
final List<String> resultColumns = getResultColumnNames();
final int sourceColumnCount = sourceTableColumns.size();
final int resultColumnCount = resultColumns.size();
if (sourceTableColumns.equals(resultColumns)) {
return resultSet -> {
final Object[] data = new Object[sourceColumnCount];
for (int i = 0; i < sourceColumnCount; i++) {
data[i] = getColumnData(resultSet, columnDataOffset + i);
}
return data;
};
}
else {
final IndicesMapping indicesMapping = new IndicesMapping(sourceTableColumns, resultColumns);
return resultSet -> {
final Object[] data = new Object[sourceColumnCount];
for (int i = 0; i < resultColumnCount; i++) {
int index = indicesMapping.getSourceTableColumnIndex(i);
data[index] = getColumnData(resultSet, columnDataOffset + i);
}
return data;
};
}
final IndicesMapping indicesMapping = new IndicesMapping(sourceTableColumns, resultColumns);
return resultSet -> {
final Object[] data = new Object[greatestColumnPosition.get()];
for (int i = 0; i < resultColumnCount; i++) {
int index = indicesMapping.getSourceTableColumnIndex(i);
data[index] = getColumnData(resultSet, columnDataOffset + i);
}
return data;
};
}
private List<String> getResultColumnNames() throws SQLException {
@ -145,11 +136,11 @@ private class IndicesMapping {
private final Map<Integer, Integer> mapping;
IndicesMapping(List<String> sourceTableColumns, List<String> captureInstanceColumns) {
IndicesMapping(Map<String, Column> sourceTableColumns, List<String> captureInstanceColumns) {
this.mapping = new HashMap<>(sourceTableColumns.size(), 1.0F);
for (int i = 0; i < captureInstanceColumns.size(); ++i) {
mapping.put(i, sourceTableColumns.indexOf(captureInstanceColumns.get(i)));
mapping.put(i, sourceTableColumns.get(captureInstanceColumns.get(i)).position() - 1);
}
}

View File

@ -338,7 +338,8 @@ public Set<SqlServerChangeTable> listOfChangeTables() throws SQLException {
rs.getString(3),
rs.getInt(4),
Lsn.valueOf(rs.getBytes(6)),
Lsn.valueOf(rs.getBytes(7))));
Lsn.valueOf(rs.getBytes(7)),
rs.getString(15)));
}
return changeTables;
});
@ -375,13 +376,18 @@ public Table getTableSchemaFromTable(SqlServerChangeTable changeTable) throws SQ
changeTable.getSourceTableId().table(),
null)) {
while (rs.next()) {
readTableColumn(rs, changeTable.getSourceTableId(), null).ifPresent(ce -> columns.add(ce.create()));
readTableColumn(rs, changeTable.getSourceTableId(), null).ifPresent(ce -> {
// Filter out columns not included in the change table.
if (changeTable.getCapturedColumnList().contains(ce.name())) {
columns.add(ce.create());
}
});
}
}
final List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, changeTable.getSourceTableId());
Collections.sort(columns);
return Table.editor()
return Table.editorSkippableColumns()
.tableId(changeTable.getSourceTableId())
.addColumns(columns)
.setPrimaryKeyNames(pkColumnNames)

View File

@ -12,7 +12,8 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Types;
import java.util.Objects;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -44,6 +45,7 @@ public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChange
private final SqlServerConnectorConfig connectorConfig;
private final SqlServerConnection jdbcConnection;
private final SqlServerDatabaseSchema sqlServerDatabaseSchema;
private Map<TableId, SqlServerChangeTable> changeTables;
public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection,
SqlServerDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
@ -185,6 +187,25 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Relati
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
// Save changeTables for sql select later.
changeTables = jdbcConnection.listOfChangeTables().stream()
.collect(Collectors.toMap(SqlServerChangeTable::getSourceTableId, changeTable -> changeTable,
(changeTable1, changeTable2) -> changeTable1.getStartLsn().compareTo(changeTable2.getStartLsn()) > 0
? changeTable1
: changeTable2));
// Update table schemas to only include columns that are also included in the cdc tables.
changeTables.forEach((tableId, sqlServerChangeTable) -> {
Table sourceTable = snapshotContext.tables.forTable(tableId);
// SourceTable will be null for that tables are excluded in the configuration, but have cdc enabled.
if (sourceTable != null) {
List<Column> cdcEnabledSourceColumns = sourceTable.filterColumns(
column -> sqlServerChangeTable.getCapturedColumnList().contains(column.name()));
snapshotContext.tables.overwriteTable(sourceTable.id(), cdcEnabledSourceColumns,
sourceTable.primaryKeyColumnNames(), sourceTable.defaultCharsetName());
}
});
}
}
@ -223,41 +244,41 @@ protected void complete(SnapshotContext snapshotContext) {
@Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) {
String modifiedColumns = checkExcludedColumns(tableId);
if (modifiedColumns != null) {
return Optional.of(String.format("SELECT %s FROM [%s].[%s]", modifiedColumns, tableId.schema(), tableId.table()));
}
return Optional.of(String.format("SELECT * FROM [%s].[%s]", tableId.schema(), tableId.table()));
return Optional.of(String.format("SELECT %s FROM [%s].[%s]", modifiedColumns, tableId.schema(), tableId.table()));
}
@Override
protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotContext, String overriddenSelect, TableId tableId) {
String modifiedColumns = checkExcludedColumns(tableId);
if (modifiedColumns != null) {
overriddenSelect = overriddenSelect.replaceAll("\\*", modifiedColumns);
}
return overriddenSelect;
return overriddenSelect.replaceAll("\\*", modifiedColumns);
}
private String checkExcludedColumns(TableId tableId) {
String modifiedColumns = null;
String excludedColumnStr = connectorConfig.getTableFilters().getExcludeColumns();
if (Objects.nonNull(excludedColumnStr)
&& excludedColumnStr.trim().length() > 0
&& excludedColumnStr.contains(tableId.table())) {
Table table = sqlServerDatabaseSchema.tableFor(tableId);
modifiedColumns = table.retrieveColumnNames().stream()
.map(s -> {
StringBuilder sb = new StringBuilder();
if (!s.contains(tableId.table())) {
sb.append(tableId.table()).append(".").append(s);
}
else {
sb.append(s);
}
return sb.toString();
}).collect(Collectors.joining(","));
Table table = sqlServerDatabaseSchema.tableFor(tableId);
return table.retrieveColumnNames().stream()
.filter(columnName -> filterChangeTableColumns(tableId, columnName))
.filter(columnName -> connectorConfig.getColumnFilter().matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName))
.map(columnName -> {
StringBuilder sb = new StringBuilder();
if (!columnName.contains(tableId.table())) {
sb.append("[").append(tableId.table()).append("]")
.append(".[").append(columnName).append("]");
}
else {
sb.append("[").append(columnName).append("]");
}
return sb.toString();
}).collect(Collectors.joining(","));
}
private boolean filterChangeTableColumns(TableId tableId, String columnName) {
SqlServerChangeTable changeTable = changeTables.get(tableId);
if (changeTable != null) {
return changeTable.getCapturedColumnList().contains(columnName);
}
return modifiedColumns;
// ChangeTable will be null if cdc has not been enabled for it yet.
// Return true to allow columns to be captured.
return true;
}
@Override

View File

@ -108,7 +108,7 @@ public void takeSnapshotWithOverrides() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE / 2);
SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2);
List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
List<SourceRecord> table2 = records.recordsForTopic("server1.dbo.table2");
List<SourceRecord> table3 = records.recordsForTopic("server1.dbo.table3");

View File

@ -13,6 +13,7 @@
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import org.fest.assertions.Assertions;
import org.junit.Before;
@ -177,9 +178,17 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
// and issue a test call to a CDC wrapper function
Thread.sleep(5_000); // Need to wait to make sure the min_lsn is available
String capturedColumnList = String.join(", ",
Arrays.asList("int_no_default_not_null", "int_no_default", "bigint_column", "int_column", "smallint_column", "tinyint_column",
"bit_column", "decimal_column", "numeric_column", "money_column", "smallmoney_column", "float_column", "real_column",
"date_column", "datetime_column", "datetime2_column", "datetime2_0_column", "datetime2_1_column", "datetime2_2_column", "datetime2_3_column",
"datetime2_4_column", "datetime2_5_column", "datetime2_6_column", "datetime2_7_column", "datetimeoffset_column", "smalldatetime_column",
"time_column", "time_0_column", "time_1_column", "time_2_column", "time_3_column", "time_4_column", "time_5_column", "time_6_column",
"time_7_column", "char_column", "varchar_column", "text_column", "nchar_column", "nvarchar_column", "ntext_column", "binary_column",
"varbinary_column", "image_column"));
SqlServerChangeTable changeTable = new SqlServerChangeTable(new TableId("testDB", "dbo", "table_with_defaults"),
null, 0, null, null);
null, 0, null, null, "[" + capturedColumnList + "]");
Table table = connection.getTableSchemaFromTable(changeTable);
assertColumnHasNotDefaultValue(table, "int_no_default_not_null");

View File

@ -8,6 +8,7 @@
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.waitForMaxLsnAvailable;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertNull;
@ -1315,6 +1316,165 @@ record = tableB.get(0);
stopConnector();
}
@Test
@FixFor("DBZ-2522")
public void whenCaptureInstanceExcludesColumnsExpectSnapshotAndStreamingToExcludeColumns() throws Exception {
connection.execute(
"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))");
connection.execute("INSERT INTO excluded_column_table_a VALUES(10, 'a name', 100)");
TestHelper.enableTableCdc(connection, "excluded_column_table_a", "dbo_excluded_column_table_a",
Arrays.asList("id", "name"));
final Configuration config = TestHelper.defaultConfig()
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("sql_server", "server1");
connection.execute("INSERT INTO excluded_column_table_a VALUES(11, 'some_name', 120)");
final SourceRecords records = consumeRecordsByTopic(3);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.excluded_column_table_a");
Schema expectedSchemaA = SchemaBuilder.struct()
.optional()
.name("server1.dbo.excluded_column_table_a.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Struct expectedValueSnapshot = new Struct(expectedSchemaA)
.put("id", 10)
.put("name", "a name");
Struct expectedValueStreaming = new Struct(expectedSchemaA)
.put("id", 11)
.put("name", "some_name");
Assertions.assertThat(tableA).hasSize(2);
SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA)
.valueAfterFieldIsEqualTo(expectedValueSnapshot);
SourceRecordAssert.assertThat(tableA.get(1))
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA)
.valueAfterFieldIsEqualTo(expectedValueStreaming);
stopConnector();
}
@Test
@FixFor("DBZ-2522")
public void whenMultipleCaptureInstancesExcludesColumnsExpectLatestCDCTableUtilized() throws Exception {
connection.execute(
"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))");
connection.execute("INSERT INTO excluded_column_table_a VALUES(10, 'a name', 100)");
TestHelper.enableTableCdc(connection, "excluded_column_table_a", "dbo_excluded_column_table_a",
Arrays.asList("id", "name"));
connection.execute("ALTER TABLE excluded_column_table_a ADD note varchar(30)");
TestHelper.enableTableCdc(connection, "excluded_column_table_a", "dbo_excluded_column_table_a_2",
Arrays.asList("id", "name", "note"));
final Configuration config = TestHelper.defaultConfig()
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("sql_server", "server1");
connection.execute("INSERT INTO excluded_column_table_a VALUES(11, 'some_name', 120, 'a note')");
final SourceRecords records = consumeRecordsByTopic(3);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.excluded_column_table_a");
Schema expectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.dbo.excluded_column_table_a.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("note", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Struct expectedValueSnapshot = new Struct(expectedSchema)
.put("id", 10)
.put("name", "a name")
.put("note", null);
Struct expectedValueStreaming = new Struct(expectedSchema)
.put("id", 11)
.put("name", "some_name")
.put("note", "a note");
Assertions.assertThat(tableA).hasSize(2);
SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldSchemaIsEqualTo(expectedSchema)
.valueAfterFieldIsEqualTo(expectedValueSnapshot);
SourceRecordAssert.assertThat(tableA.get(1))
.valueAfterFieldSchemaIsEqualTo(expectedSchema)
.valueAfterFieldIsEqualTo(expectedValueStreaming);
stopConnector();
}
@Test
@FixFor("DBZ-2522")
public void whenCaptureInstanceExcludesColumnsAndColumnsRenamedExpectNoErrors() throws Exception {
connection.execute(
"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))");
connection.execute("INSERT INTO excluded_column_table_a VALUES(10, 'a name', 100)");
TestHelper.enableTableCdc(connection, "excluded_column_table_a", "dbo_excluded_column_table_a",
Arrays.asList("id", "name"));
final Configuration config = TestHelper.defaultConfig()
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("sql_server", "server1");
waitForMaxLsnAvailable(connection);
TestHelper.disableTableCdc(connection, "excluded_column_table_a");
connection.execute("EXEC sp_RENAME 'excluded_column_table_a.name', 'first_name', 'COLUMN'");
TestHelper.enableTableCdc(connection, "excluded_column_table_a", "dbo_excluded_column_table_a",
Arrays.asList("id", "first_name"));
waitForMaxLsnAvailable(connection);
connection.execute("INSERT INTO excluded_column_table_a VALUES(11, 'some_name', 120)");
final SourceRecords records = consumeRecordsByTopic(3);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.excluded_column_table_a");
Schema expectedSchema1 = SchemaBuilder.struct()
.optional()
.name("server1.dbo.excluded_column_table_a.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Struct expectedValueSnapshot = new Struct(expectedSchema1)
.put("id", 10)
.put("name", "a name");
Schema expectedSchema2 = SchemaBuilder.struct()
.optional()
.name("server1.dbo.excluded_column_table_a.Value")
.field("id", Schema.INT32_SCHEMA)
.field("first_name", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Struct expectedValueStreaming = new Struct(expectedSchema2)
.put("id", 11)
.put("first_name", "some_name");
Assertions.assertThat(tableA).hasSize(2);
SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldSchemaIsEqualTo(expectedSchema1)
.valueAfterFieldIsEqualTo(expectedValueSnapshot);
SourceRecordAssert.assertThat(tableA.get(1))
.valueAfterFieldSchemaIsEqualTo(expectedSchema2)
.valueAfterFieldIsEqualTo(expectedValueStreaming);
stopConnector();
}
@Test
@FixFor("DBZ-1068")
public void excludeColumnWhenCaptureInstanceExcludesColumns() throws Exception {
@ -1348,8 +1508,8 @@ public void excludeColumnWhenCaptureInstanceExcludesColumns() throws Exception {
Assertions.assertThat(tableA).hasSize(1);
SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldIsEqualTo(expectedValueA)
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA);
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA)
.valueAfterFieldIsEqualTo(expectedValueA);
stopConnector();
}
@ -1359,20 +1519,21 @@ public void excludeColumnWhenCaptureInstanceExcludesColumns() throws Exception {
public void excludeColumnWhenCaptureInstanceExcludesColumnInMiddleOfTable() throws Exception {
connection.execute(
"CREATE TABLE exclude_list_column_table_a (id int, amount integer, name varchar(30), primary key(id))");
connection.execute("INSERT INTO exclude_list_column_table_a VALUES(10, 100, 'a name')");
TestHelper.enableTableCdc(connection, "exclude_list_column_table_a", "dbo_exclude_list_column_table_a",
Arrays.asList("id", "name"));
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.exclude_list_column_table_a.amount")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("sql_server", "server1");
connection.execute("INSERT INTO exclude_list_column_table_a VALUES(11, 120, 'some_name')");
connection.execute("INSERT INTO exclude_list_column_table_a VALUES(10, 120, 'some_name')");
final SourceRecords records = consumeRecordsByTopic(1);
final SourceRecords records = consumeRecordsByTopic(3); // 3 as tablea has row insert in the before
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.exclude_list_column_table_a");
Schema expectedSchemaA = SchemaBuilder.struct()
@ -1381,14 +1542,20 @@ public void excludeColumnWhenCaptureInstanceExcludesColumnInMiddleOfTable() thro
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Struct expectedValueA = new Struct(expectedSchemaA)
Struct expectedValue1 = new Struct(expectedSchemaA)
.put("id", 10)
.put("name", "a name");
Struct expectedValue2 = new Struct(expectedSchemaA)
.put("id", 11)
.put("name", "some_name");
Assertions.assertThat(tableA).hasSize(1);
Assertions.assertThat(tableA).hasSize(2);
SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldIsEqualTo(expectedValueA)
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA);
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA)
.valueAfterFieldIsEqualTo(expectedValue1);
SourceRecordAssert.assertThat(tableA.get(1))
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA)
.valueAfterFieldIsEqualTo(expectedValue2);
stopConnector();
}
@ -1466,8 +1633,8 @@ public void excludeMultipleColumnsWhenCaptureInstanceExcludesSingleColumn() thro
Assertions.assertThat(tableA).hasSize(1);
SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldIsEqualTo(expectedValueA)
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA);
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA)
.valueAfterFieldIsEqualTo(expectedValueA);
stopConnector();
}

View File

@ -318,8 +318,15 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext, Re
try (Statement statement = readTableStatement();
ResultSet rs = statement.executeQuery(selectStatement.get())) {
Column[] columns = getColumnsForResultSet(table, rs);
final int numColumns = table.columns().size();
ResultSetMetaData metaData = rs.getMetaData();
Column[] columns = new Column[metaData.getColumnCount()];
int greatestColumnPosition = 0;
for (int i = 0; i < columns.length; i++) {
columns[i] = table.columnWithName(metaData.getColumnName(i + 1));
greatestColumnPosition = greatestColumnPosition < columns[i].position()
? columns[i].position()
: greatestColumnPosition;
}
long rows = 0;
Timer logTimer = getTableScanLogTimer();
snapshotContext.lastRecordInTable = false;
@ -331,9 +338,9 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext, Re
}
rows++;
final Object[] row = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
row[i] = getColumnValue(rs, i + 1, columns[i]);
final Object[] row = new Object[greatestColumnPosition];
for (int i = 0; i < columns.length; i++) {
row[columns[i].position() - 1] = getColumnValue(rs, i + 1, columns[i]);
}
snapshotContext.lastRecordInTable = !rs.next();
@ -414,17 +421,6 @@ protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotConte
// scn xyz")
protected abstract Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId);
private Column[] getColumnsForResultSet(Table table, ResultSet rs) throws SQLException {
ResultSetMetaData metaData = rs.getMetaData();
Column[] columns = new Column[metaData.getColumnCount()];
for (int i = 0; i < columns.length; i++) {
columns[i] = table.columnWithName(metaData.getColumnName(i + 1));
}
return columns;
}
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column) throws SQLException {
return rs.getObject(columnIndex);
}

View File

@ -22,10 +22,14 @@ public interface Table {
*
* @return the editor; never null
*/
public static TableEditor editor() {
static TableEditor editor() {
return new TableEditorImpl();
}
static TableEditor editorSkippableColumns() {
return new TableEditorSkippableColumnsImpl();
}
/**
* Get the identifier for this table.
* @return the identifier; never null

View File

@ -13,7 +13,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
final class TableEditorImpl implements TableEditor {
class TableEditorImpl implements TableEditor {
private TableId id;
private LinkedHashMap<String, Column> sortedColumns = new LinkedHashMap<>();
@ -256,7 +256,7 @@ protected void updatePositions() {
protected boolean positionsAreValid() {
AtomicInteger position = new AtomicInteger(1);
return sortedColumns.values().stream().allMatch(defn -> defn.position() == position.getAndIncrement());
return sortedColumns.values().stream().allMatch(defn -> defn.position() >= position.getAndSet(defn.position() + 1));
}
@Override
@ -276,4 +276,8 @@ public Table create() {
});
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName);
}
public LinkedHashMap<String, Column> getSortedColumns() {
return sortedColumns;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import java.util.concurrent.atomic.AtomicInteger;
final class TableEditorSkippableColumnsImpl extends TableEditorImpl {
/**
* Trusts inputted column position to be correct.
*/
@Override
protected void add(Column defn) {
if (defn != null) {
getSortedColumns().put(defn.name().toLowerCase(), defn);
}
assert positionsAreValid();
}
/**
* Do not automatically update columns.
*/
@Override
protected void updatePositions() {
// Do nothing and allow
}
/**
* Verify columns are incrementing by some amount/unique.
*/
@Override
protected boolean positionsAreValid() {
AtomicInteger position = new AtomicInteger(1);
return getSortedColumns().values().stream().allMatch(defn -> defn.position() >= position.getAndSet(defn.position() + 1));
}
}

View File

@ -82,7 +82,7 @@ public static void enable() {
}
public static void disable() {
enabled = true;
enabled = false;
}
public static boolean isEnabled() {