diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 27bc53164..fed010143 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -387,7 +387,7 @@ public Table getTableSchemaFromTable(SqlServerChangeTable changeTable) throws SQ final List pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, changeTable.getSourceTableId()); Collections.sort(columns); - return Table.editorSkippableColumns() + return Table.editor() .tableId(changeTable.getSourceTableId()) .addColumns(columns) .setPrimaryKeyNames(pkColumnNames) diff --git a/debezium-core/src/main/java/io/debezium/relational/Table.java b/debezium-core/src/main/java/io/debezium/relational/Table.java index 84316cf4e..aad1aaab5 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Table.java +++ b/debezium-core/src/main/java/io/debezium/relational/Table.java @@ -26,10 +26,6 @@ static TableEditor editor() { return new TableEditorImpl(); } - static TableEditor editorSkippableColumns() { - return new TableEditorSkippableColumnsImpl(); - } - /** * Get the identifier for this table. * @return the identifier; never null @@ -150,4 +146,6 @@ default boolean isOptional(String columnName) { * @return the editor; never null */ TableEditor edit(); + + void toString(StringBuilder sb, String prefix); } diff --git a/debezium-core/src/main/java/io/debezium/relational/TableEditorSkippableColumnsImpl.java b/debezium-core/src/main/java/io/debezium/relational/TableEditorSkippableColumnsImpl.java deleted file mode 100644 index 511365fbe..000000000 --- a/debezium-core/src/main/java/io/debezium/relational/TableEditorSkippableColumnsImpl.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.relational; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Allows for columns to be "missing" or "skipped" from the table to support captured_column_list for sql sever connector. - * Example, columns 1,2,4 are populated, but 3 is excluded from the CDC tables. - */ -final class TableEditorSkippableColumnsImpl extends TableEditorImpl { - - /** - * Trusts inputted column position to be correct. - */ - @Override - protected void add(Column defn) { - if (defn != null) { - getSortedColumns().put(defn.name().toLowerCase(), defn); - } - assert positionsAreValid(); - } - - /** - * Do not automatically update columns. - */ - @Override - protected void updatePositions() { - // Do nothing and allow columns to maintain their inputted positions. - } - - /** - * Verify columns are incrementing by some amount/unique. - */ - @Override - protected boolean positionsAreValid() { - AtomicInteger position = new AtomicInteger(1); - return getSortedColumns().values().stream().allMatch(defn -> defn.position() >= position.getAndSet(defn.position() + 1)); - } -} diff --git a/debezium-core/src/main/java/io/debezium/relational/TableImpl.java b/debezium-core/src/main/java/io/debezium/relational/TableImpl.java index 10bbfdc57..8a32fd134 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableImpl.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableImpl.java @@ -96,7 +96,8 @@ public String toString() { return sb.toString(); } - protected void toString(StringBuilder sb, String prefix) { + @Override + public void toString(StringBuilder sb, String prefix) { if (prefix == null) { prefix = ""; } diff --git a/debezium-core/src/main/java/io/debezium/relational/Tables.java b/debezium-core/src/main/java/io/debezium/relational/Tables.java index 73fff08d8..6b87c67bf 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Tables.java +++ b/debezium-core/src/main/java/io/debezium/relational/Tables.java @@ -182,8 +182,14 @@ public Set drainChanges() { public Table overwriteTable(TableId tableId, List columnDefs, List primaryKeyColumnNames, String defaultCharsetName) { return lock.write(() -> { - TableImpl updated = new TableImpl(tableId, columnDefs, primaryKeyColumnNames, defaultCharsetName); - TableImpl existing = tablesByTableId.get(tableId); + Table updated = Table.editor() + .tableId(tableId) + .addColumns(columnDefs) + .setPrimaryKeyNames(primaryKeyColumnNames) + .setDefaultCharsetName(defaultCharsetName) + .create(); + + Table existing = tablesByTableId.get(tableId); if (existing == null || !existing.equals(updated)) { // Our understanding of the table has changed ... changes.add(tableId); @@ -264,7 +270,7 @@ public Table renameTable(TableId existingTableId, TableId newTableId) { */ public Table updateTable(TableId tableId, Function changer) { return lock.write(() -> { - TableImpl existing = tablesByTableId.get(tableId); + Table existing = tablesByTableId.get(tableId); Table updated = changer.apply(existing); if (updated != existing) { tablesByTableId.put(tableId, new TableImpl(tableId, updated.columns(), @@ -403,7 +409,7 @@ public String toString() { private static class TablesById { private final boolean tableIdCaseInsensitive; - private final ConcurrentMap values; + private final ConcurrentMap values; public TablesById(boolean tableIdCaseInsensitive) { this.tableIdCaseInsensitive = tableIdCaseInsensitive; @@ -428,15 +434,15 @@ public void putAll(TablesById tablesByTableId) { } } - public TableImpl remove(TableId tableId) { + public Table remove(TableId tableId) { return values.remove(toLowerCaseIfNeeded(tableId)); } - public TableImpl get(TableId tableId) { + public Table get(TableId tableId) { return values.get(toLowerCaseIfNeeded(tableId)); } - public Table put(TableId tableId, TableImpl updated) { + public Table put(TableId tableId, Table updated) { return values.put(toLowerCaseIfNeeded(tableId), updated); } @@ -444,11 +450,11 @@ int size() { return values.size(); } - void forEach(BiConsumer action) { + void forEach(BiConsumer action) { values.forEach(action); } - Set> entrySet() { + Set> entrySet() { return values.entrySet(); }