DBZ-2495 Updating tables to utilize Table.editor for creating new tables

This commit is contained in:
James Gormley 2020-09-24 06:10:09 -04:00 committed by Gunnar Morling
parent ad2273e9f6
commit 1b845535ab
5 changed files with 20 additions and 58 deletions

View File

@ -387,7 +387,7 @@ public Table getTableSchemaFromTable(SqlServerChangeTable changeTable) throws SQ
final List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, changeTable.getSourceTableId());
Collections.sort(columns);
return Table.editorSkippableColumns()
return Table.editor()
.tableId(changeTable.getSourceTableId())
.addColumns(columns)
.setPrimaryKeyNames(pkColumnNames)

View File

@ -26,10 +26,6 @@ static TableEditor editor() {
return new TableEditorImpl();
}
static TableEditor editorSkippableColumns() {
return new TableEditorSkippableColumnsImpl();
}
/**
* Get the identifier for this table.
* @return the identifier; never null
@ -150,4 +146,6 @@ default boolean isOptional(String columnName) {
* @return the editor; never null
*/
TableEditor edit();
void toString(StringBuilder sb, String prefix);
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Allows for columns to be "missing" or "skipped" from the table to support captured_column_list for sql sever connector.
* Example, columns 1,2,4 are populated, but 3 is excluded from the CDC tables.
*/
final class TableEditorSkippableColumnsImpl extends TableEditorImpl {
/**
* Trusts inputted column position to be correct.
*/
@Override
protected void add(Column defn) {
if (defn != null) {
getSortedColumns().put(defn.name().toLowerCase(), defn);
}
assert positionsAreValid();
}
/**
* Do not automatically update columns.
*/
@Override
protected void updatePositions() {
// Do nothing and allow columns to maintain their inputted positions.
}
/**
* Verify columns are incrementing by some amount/unique.
*/
@Override
protected boolean positionsAreValid() {
AtomicInteger position = new AtomicInteger(1);
return getSortedColumns().values().stream().allMatch(defn -> defn.position() >= position.getAndSet(defn.position() + 1));
}
}

View File

@ -96,7 +96,8 @@ public String toString() {
return sb.toString();
}
protected void toString(StringBuilder sb, String prefix) {
@Override
public void toString(StringBuilder sb, String prefix) {
if (prefix == null) {
prefix = "";
}

View File

@ -182,8 +182,14 @@ public Set<TableId> drainChanges() {
public Table overwriteTable(TableId tableId, List<Column> columnDefs, List<String> primaryKeyColumnNames,
String defaultCharsetName) {
return lock.write(() -> {
TableImpl updated = new TableImpl(tableId, columnDefs, primaryKeyColumnNames, defaultCharsetName);
TableImpl existing = tablesByTableId.get(tableId);
Table updated = Table.editor()
.tableId(tableId)
.addColumns(columnDefs)
.setPrimaryKeyNames(primaryKeyColumnNames)
.setDefaultCharsetName(defaultCharsetName)
.create();
Table existing = tablesByTableId.get(tableId);
if (existing == null || !existing.equals(updated)) {
// Our understanding of the table has changed ...
changes.add(tableId);
@ -264,7 +270,7 @@ public Table renameTable(TableId existingTableId, TableId newTableId) {
*/
public Table updateTable(TableId tableId, Function<Table, Table> changer) {
return lock.write(() -> {
TableImpl existing = tablesByTableId.get(tableId);
Table existing = tablesByTableId.get(tableId);
Table updated = changer.apply(existing);
if (updated != existing) {
tablesByTableId.put(tableId, new TableImpl(tableId, updated.columns(),
@ -403,7 +409,7 @@ public String toString() {
private static class TablesById {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, TableImpl> values;
private final ConcurrentMap<TableId, Table> values;
public TablesById(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
@ -428,15 +434,15 @@ public void putAll(TablesById tablesByTableId) {
}
}
public TableImpl remove(TableId tableId) {
public Table remove(TableId tableId) {
return values.remove(toLowerCaseIfNeeded(tableId));
}
public TableImpl get(TableId tableId) {
public Table get(TableId tableId) {
return values.get(toLowerCaseIfNeeded(tableId));
}
public Table put(TableId tableId, TableImpl updated) {
public Table put(TableId tableId, Table updated) {
return values.put(toLowerCaseIfNeeded(tableId), updated);
}
@ -444,11 +450,11 @@ int size() {
return values.size();
}
void forEach(BiConsumer<? super TableId, ? super TableImpl> action) {
void forEach(BiConsumer<? super TableId, ? super Table> action) {
values.forEach(action);
}
Set<Map.Entry<TableId, TableImpl>> entrySet() {
Set<Map.Entry<TableId, Table>> entrySet() {
return values.entrySet();
}