DBZ-5396 Add support for connector-specific relational model attributes
This commit is contained in:
parent
a10b492df8
commit
f412c2e6a4
@ -81,7 +81,7 @@ public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
|
|||||||
TableId originalTableId = parser.parseQualifiedTableId(ctx.tableName(1).fullId());
|
TableId originalTableId = parser.parseQualifiedTableId(ctx.tableName(1).fullId());
|
||||||
Table original = parser.databaseTables().forTable(originalTableId);
|
Table original = parser.databaseTables().forTable(originalTableId);
|
||||||
if (original != null) {
|
if (original != null) {
|
||||||
parser.databaseTables().overwriteTable(tableId, original.columns(), original.primaryKeyColumnNames(), original.defaultCharsetName());
|
parser.databaseTables().overwriteTable(tableId, original.columns(), original.primaryKeyColumnNames(), original.defaultCharsetName(), original.attributes());
|
||||||
parser.signalCreateTable(tableId, ctx);
|
parser.signalCreateTable(tableId, ctx);
|
||||||
}
|
}
|
||||||
super.exitCopyCreateTable(ctx);
|
super.exitCopyCreateTable(ctx);
|
||||||
|
@ -217,7 +217,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
snapshotContext.tables.overwriteTable(sourceTable.id(), cdcEnabledSourceColumns,
|
snapshotContext.tables.overwriteTable(sourceTable.id(), cdcEnabledSourceColumns,
|
||||||
cdcEnabledPkColumns, sourceTable.defaultCharsetName());
|
cdcEnabledPkColumns, sourceTable.defaultCharsetName(), sourceTable.attributes());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import io.debezium.annotation.ThreadSafe;
|
import io.debezium.annotation.ThreadSafe;
|
||||||
import io.debezium.config.CommonConnectorConfig;
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
import io.debezium.config.Field;
|
import io.debezium.config.Field;
|
||||||
|
import io.debezium.relational.Attribute;
|
||||||
import io.debezium.relational.Column;
|
import io.debezium.relational.Column;
|
||||||
import io.debezium.relational.ColumnEditor;
|
import io.debezium.relational.ColumnEditor;
|
||||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||||
@ -1160,6 +1161,8 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
|
|||||||
final Set<TableId> viewIds = new HashSet<>();
|
final Set<TableId> viewIds = new HashSet<>();
|
||||||
final Set<TableId> tableIds = new HashSet<>();
|
final Set<TableId> tableIds = new HashSet<>();
|
||||||
|
|
||||||
|
Map<TableId, List<Attribute>> attributesByTable = new HashMap<>();
|
||||||
|
|
||||||
int totalTables = 0;
|
int totalTables = 0;
|
||||||
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, supportedTableTypes())) {
|
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null, supportedTableTypes())) {
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
@ -1172,11 +1175,13 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
|
|||||||
TableId tableId = new TableId(catalogName, schemaName, tableName);
|
TableId tableId = new TableId(catalogName, schemaName, tableName);
|
||||||
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
|
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
|
||||||
tableIds.add(tableId);
|
tableIds.add(tableId);
|
||||||
|
attributesByTable.putAll(getAttributeDetails(tableId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
TableId tableId = new TableId(catalogName, schemaName, tableName);
|
TableId tableId = new TableId(catalogName, schemaName, tableName);
|
||||||
viewIds.add(tableId);
|
viewIds.add(tableId);
|
||||||
|
attributesByTable.putAll(getAttributeDetails(tableId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1205,7 +1210,8 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
|
|||||||
List<Column> columns = tableEntry.getValue();
|
List<Column> columns = tableEntry.getValue();
|
||||||
Collections.sort(columns);
|
Collections.sort(columns);
|
||||||
String defaultCharsetName = null; // JDBC does not expose character sets
|
String defaultCharsetName = null; // JDBC does not expose character sets
|
||||||
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName);
|
List<Attribute> attributes = attributesByTable.getOrDefault(tableEntry.getKey(), Collections.emptyList());
|
||||||
|
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName, attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (removeTablesNotFoundInJdbc) {
|
if (removeTablesNotFoundInJdbc) {
|
||||||
@ -1256,6 +1262,11 @@ protected Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog, S
|
|||||||
return columnsByTable;
|
return columnsByTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Map<TableId, List<Attribute>> getAttributeDetails(TableId tableId) {
|
||||||
|
// no-op, allows connectors to populate table attributes during relational table creation
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link ColumnEditor} representing the current record of the given result set of column metadata, if
|
* Returns a {@link ColumnEditor} representing the current record of the given result set of column metadata, if
|
||||||
* included in column.include.list.
|
* included in column.include.list.
|
||||||
|
@ -0,0 +1,46 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.relational;
|
||||||
|
|
||||||
|
import io.debezium.annotation.Immutable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An immutable attribute associated with a relational table.
|
||||||
|
*
|
||||||
|
* @author Chris Cranford
|
||||||
|
*/
|
||||||
|
@Immutable
|
||||||
|
public interface Attribute {
|
||||||
|
/**
|
||||||
|
* Obtain an attribute editor that can be used to define an attribute.
|
||||||
|
*
|
||||||
|
* @return the editor; never null
|
||||||
|
*/
|
||||||
|
static AttributeEditor editor() {
|
||||||
|
return new AttributeEditorImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The attribute name.
|
||||||
|
*
|
||||||
|
* @return the name of the attribute, never null
|
||||||
|
*/
|
||||||
|
String name();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The attribute value.
|
||||||
|
*
|
||||||
|
* @return the value of the attribute, may be null
|
||||||
|
*/
|
||||||
|
String value();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain an editor that contains the same information as this attribute.
|
||||||
|
*
|
||||||
|
* @return the editor; never null
|
||||||
|
*/
|
||||||
|
AttributeEditor edit();
|
||||||
|
}
|
@ -0,0 +1,56 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.relational;
|
||||||
|
|
||||||
|
import io.debezium.annotation.NotThreadSafe;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An editor for {@link Attribute} instances.
|
||||||
|
*
|
||||||
|
* @author Chris Cranford
|
||||||
|
*/
|
||||||
|
@NotThreadSafe
|
||||||
|
public interface AttributeEditor {
|
||||||
|
/**
|
||||||
|
* Get the name of the attribute.
|
||||||
|
*
|
||||||
|
* @return the attribute name; may be null if not set
|
||||||
|
*/
|
||||||
|
String name();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the value of the attribute.
|
||||||
|
*
|
||||||
|
* @return the attribute value; may be null if not set
|
||||||
|
*/
|
||||||
|
String value();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the name of the attribute.
|
||||||
|
*
|
||||||
|
* @param name the attribute name
|
||||||
|
* @return this editor so callers can chain methods together
|
||||||
|
*/
|
||||||
|
AttributeEditor name(String name);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the value of the attribute.
|
||||||
|
*
|
||||||
|
* @param value the attribute value
|
||||||
|
* @return this editor so callers can chain methods together
|
||||||
|
*/
|
||||||
|
AttributeEditor value(String value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain an immutable attribute definition representing the current state of this editor.
|
||||||
|
* Typically, an editor is created and used to build an attribute, and then discarded. However, this editor
|
||||||
|
* with its current state can be reused after this method, since the resulting attribute definition no
|
||||||
|
* longer refers to any of the data used in this editor.
|
||||||
|
*
|
||||||
|
* @return the immutable attribute definition; never null
|
||||||
|
*/
|
||||||
|
Attribute create();
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.relational;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Chris Cranford
|
||||||
|
*/
|
||||||
|
final class AttributeEditorImpl implements AttributeEditor {
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
protected AttributeEditorImpl() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String value() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AttributeEditor name(String name) {
|
||||||
|
this.name = name;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AttributeEditor value(String value) {
|
||||||
|
this.value = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Attribute create() {
|
||||||
|
return new AttributeImpl(name, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return create().toString();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.relational;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Relational model implementation of {@link Attribute}.
|
||||||
|
*
|
||||||
|
* @author Chris Cranford
|
||||||
|
*/
|
||||||
|
final class AttributeImpl implements Attribute {
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final String value;
|
||||||
|
|
||||||
|
public AttributeImpl(String name, String value) {
|
||||||
|
this.name = name;
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String value() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (obj instanceof Attribute) {
|
||||||
|
AttributeImpl attribute = (AttributeImpl) obj;
|
||||||
|
return Objects.equals(name, attribute.name) && Objects.equals(value, attribute.value);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(name, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "name='" + name + "', value='" + value + "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AttributeEditor edit() {
|
||||||
|
return Attribute.editor().name(name()).value(value());
|
||||||
|
}
|
||||||
|
}
|
@ -132,6 +132,31 @@ public TableEditor renameColumn(String existingName, String newName) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Attribute> attributes() {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Attribute attributeWithName(String attributeName) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableEditor addAttribute(Attribute attribute) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableEditor addAttributes(List<Attribute> attributes) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableEditor removeAttribute(String attributeName) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return create().toString();
|
return create().toString();
|
||||||
@ -143,6 +168,7 @@ public Table create() {
|
|||||||
throw new IllegalStateException("Unable to create a table from an editor that has no table ID");
|
throw new IllegalStateException("Unable to create a table from an editor that has no table ID");
|
||||||
}
|
}
|
||||||
List<Column> columns = new ArrayList<>();
|
List<Column> columns = new ArrayList<>();
|
||||||
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName, comment);
|
List<Attribute> attributes = new ArrayList<>();
|
||||||
|
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName, comment, attributes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,6 +102,20 @@ default List<Column> filterColumns(Predicate<Column> predicate) {
|
|||||||
*/
|
*/
|
||||||
String comment();
|
String comment();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the attributes of the table.
|
||||||
|
* @return the table attributes; may be null if not set
|
||||||
|
*/
|
||||||
|
List<Attribute> attributes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the definition for an attribute in this table with the supplied name.
|
||||||
|
*
|
||||||
|
* @param name the case-insensitive name of the attribute
|
||||||
|
* @return the attribute definition, or null if there is no attribute with the given name
|
||||||
|
*/
|
||||||
|
Attribute attributeWithName(String name);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine if the named column is part of the primary key.
|
* Determine if the named column is part of the primary key.
|
||||||
*
|
*
|
||||||
|
@ -239,6 +239,47 @@ default TableEditor addColumn(Column column) {
|
|||||||
*/
|
*/
|
||||||
boolean hasUniqueValues();
|
boolean hasUniqueValues();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the definitions for the attributes in this table. The resulting list should not be modified directly;
|
||||||
|
* instead, the attributes definitions should be defined with {@link #addAttribute(Attribute)} or
|
||||||
|
* {@link #removeAttribute(String)}.
|
||||||
|
*
|
||||||
|
* @return the list of attribute definitions; never null
|
||||||
|
*/
|
||||||
|
List<Attribute> attributes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the definition for the attribute in this table with the supplied name. The case of the supplied name does not matter.
|
||||||
|
*
|
||||||
|
* @param attributeName the attribute name
|
||||||
|
* @return the attribute definition; or null if no attribute exists with the given name
|
||||||
|
*/
|
||||||
|
Attribute attributeWithName(String attributeName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new attribute to this table.
|
||||||
|
*
|
||||||
|
* @param attribute the definition for the attribute to be added
|
||||||
|
* @return this editor so callers can chain methods together
|
||||||
|
*/
|
||||||
|
TableEditor addAttribute(Attribute attribute);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add attributes to this table.
|
||||||
|
*
|
||||||
|
* @param attributes the list of attribute definitions to be added
|
||||||
|
* @return this editor so callers can chain methods together
|
||||||
|
*/
|
||||||
|
TableEditor addAttributes(List<Attribute> attributes);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove an attribute from this table.
|
||||||
|
*
|
||||||
|
* @param attributeName the name of the attribute to be removed
|
||||||
|
* @return this editor so callers can chain methods togethe
|
||||||
|
*/
|
||||||
|
TableEditor removeAttribute(String attributeName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain an immutable table definition representing the current state of this editor. This editor can be reused
|
* Obtain an immutable table definition representing the current state of this editor. This editor can be reused
|
||||||
* after this method, since the resulting table definition no longer refers to any of the data used in this editor.
|
* after this method, since the resulting table definition no longer refers to any of the data used in this editor.
|
||||||
|
@ -21,6 +21,7 @@ class TableEditorImpl implements TableEditor {
|
|||||||
private boolean uniqueValues = false;
|
private boolean uniqueValues = false;
|
||||||
private String defaultCharsetName;
|
private String defaultCharsetName;
|
||||||
private String comment;
|
private String comment;
|
||||||
|
private LinkedHashMap<String, Attribute> attributes = new LinkedHashMap<>();
|
||||||
|
|
||||||
protected TableEditorImpl() {
|
protected TableEditorImpl() {
|
||||||
}
|
}
|
||||||
@ -232,6 +233,40 @@ public TableEditor renameColumn(String existingName, String newName) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Attribute> attributes() {
|
||||||
|
return Collections.unmodifiableList(new ArrayList<>(attributes.values()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Attribute attributeWithName(String attributeName) {
|
||||||
|
return attributes.get(attributeName.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableEditor addAttribute(Attribute attribute) {
|
||||||
|
if (attribute != null) {
|
||||||
|
attributes.put(attribute.name().toLowerCase(), attribute);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableEditor addAttributes(List<Attribute> attributes) {
|
||||||
|
for (Attribute attribute : attributes) {
|
||||||
|
addAttribute(attribute);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableEditor removeAttribute(String attributeName) {
|
||||||
|
if (attributeName != null) {
|
||||||
|
attributes.remove(attributeName.toLowerCase());
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
protected void updatePositions() {
|
protected void updatePositions() {
|
||||||
AtomicInteger position = new AtomicInteger(1);
|
AtomicInteger position = new AtomicInteger(1);
|
||||||
sortedColumns.replaceAll((name, defn) -> {
|
sortedColumns.replaceAll((name, defn) -> {
|
||||||
@ -265,6 +300,7 @@ public Table create() {
|
|||||||
columns.add(column);
|
columns.add(column);
|
||||||
});
|
});
|
||||||
updatePrimaryKeys();
|
updatePrimaryKeys();
|
||||||
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName, comment);
|
List<Attribute> attributes = new ArrayList<>(this.attributes.values());
|
||||||
|
return new TableImpl(id, columns, primaryKeyColumnNames(), defaultCharsetName, comment, attributes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,14 +22,15 @@ final class TableImpl implements Table {
|
|||||||
private final Map<String, Column> columnsByLowercaseName;
|
private final Map<String, Column> columnsByLowercaseName;
|
||||||
private final String defaultCharsetName;
|
private final String defaultCharsetName;
|
||||||
private final String comment;
|
private final String comment;
|
||||||
|
private final List<Attribute> attributes;
|
||||||
|
|
||||||
@PackagePrivate
|
@PackagePrivate
|
||||||
TableImpl(Table table) {
|
TableImpl(Table table) {
|
||||||
this(table.id(), table.columns(), table.primaryKeyColumnNames(), table.defaultCharsetName(), table.comment());
|
this(table.id(), table.columns(), table.primaryKeyColumnNames(), table.defaultCharsetName(), table.comment(), table.attributes());
|
||||||
}
|
}
|
||||||
|
|
||||||
@PackagePrivate
|
@PackagePrivate
|
||||||
TableImpl(TableId id, List<Column> sortedColumns, List<String> pkColumnNames, String defaultCharsetName, String comment) {
|
TableImpl(TableId id, List<Column> sortedColumns, List<String> pkColumnNames, String defaultCharsetName, String comment, List<Attribute> attributes) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.columnDefs = Collections.unmodifiableList(sortedColumns);
|
this.columnDefs = Collections.unmodifiableList(sortedColumns);
|
||||||
this.pkColumnNames = pkColumnNames == null ? Collections.emptyList() : Collections.unmodifiableList(pkColumnNames);
|
this.pkColumnNames = pkColumnNames == null ? Collections.emptyList() : Collections.unmodifiableList(pkColumnNames);
|
||||||
@ -40,6 +41,7 @@ final class TableImpl implements Table {
|
|||||||
this.columnsByLowercaseName = Collections.unmodifiableMap(defsByLowercaseName);
|
this.columnsByLowercaseName = Collections.unmodifiableMap(defsByLowercaseName);
|
||||||
this.defaultCharsetName = defaultCharsetName;
|
this.defaultCharsetName = defaultCharsetName;
|
||||||
this.comment = comment;
|
this.comment = comment;
|
||||||
|
this.attributes = attributes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -79,6 +81,19 @@ public String comment() {
|
|||||||
return comment;
|
return comment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Attribute> attributes() {
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Attribute attributeWithName(String name) {
|
||||||
|
if (attributes == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return attributes.stream().filter(a -> name.equalsIgnoreCase(a.name())).findFirst().orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return id.hashCode();
|
return id.hashCode();
|
||||||
@ -118,6 +133,11 @@ public void toString(StringBuilder sb, String prefix) {
|
|||||||
sb.append(prefix).append("primary key: ").append(primaryKeyColumnNames()).append(System.lineSeparator());
|
sb.append(prefix).append("primary key: ").append(primaryKeyColumnNames()).append(System.lineSeparator());
|
||||||
sb.append(prefix).append("default charset: ").append(defaultCharsetName()).append(System.lineSeparator());
|
sb.append(prefix).append("default charset: ").append(defaultCharsetName()).append(System.lineSeparator());
|
||||||
sb.append(prefix).append("comment: ").append(comment()).append(System.lineSeparator());
|
sb.append(prefix).append("comment: ").append(comment()).append(System.lineSeparator());
|
||||||
|
sb.append(prefix).append("attributes: {").append(System.lineSeparator());
|
||||||
|
for (Attribute attribute : attributes) {
|
||||||
|
sb.append(prefix).append(" ").append(attribute).append(System.lineSeparator());
|
||||||
|
}
|
||||||
|
sb.append(prefix).append("}").append(System.lineSeparator());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -179,16 +179,18 @@ public Set<TableId> drainChanges() {
|
|||||||
* @param columnDefs the list of column definitions; may not be null or empty
|
* @param columnDefs the list of column definitions; may not be null or empty
|
||||||
* @param primaryKeyColumnNames the list of the column names that make up the primary key; may be null or empty
|
* @param primaryKeyColumnNames the list of the column names that make up the primary key; may be null or empty
|
||||||
* @param defaultCharsetName the name of the character set that should be used by default
|
* @param defaultCharsetName the name of the character set that should be used by default
|
||||||
|
* @param attributes the list of attribute definitions; may not be null or empty
|
||||||
* @return the previous table definition, or null if there was no prior table definition
|
* @return the previous table definition, or null if there was no prior table definition
|
||||||
*/
|
*/
|
||||||
public Table overwriteTable(TableId tableId, List<Column> columnDefs, List<String> primaryKeyColumnNames,
|
public Table overwriteTable(TableId tableId, List<Column> columnDefs, List<String> primaryKeyColumnNames,
|
||||||
String defaultCharsetName) {
|
String defaultCharsetName, List<Attribute> attributes) {
|
||||||
return lock.write(() -> {
|
return lock.write(() -> {
|
||||||
Table updated = Table.editor()
|
Table updated = Table.editor()
|
||||||
.tableId(tableId)
|
.tableId(tableId)
|
||||||
.addColumns(columnDefs)
|
.addColumns(columnDefs)
|
||||||
.setPrimaryKeyNames(primaryKeyColumnNames)
|
.setPrimaryKeyNames(primaryKeyColumnNames)
|
||||||
.setDefaultCharsetName(defaultCharsetName)
|
.setDefaultCharsetName(defaultCharsetName)
|
||||||
|
.addAttributes(attributes)
|
||||||
.create();
|
.create();
|
||||||
|
|
||||||
Table existing = tablesByTableId.get(tableId);
|
Table existing = tablesByTableId.get(tableId);
|
||||||
@ -251,7 +253,7 @@ public Table renameTable(TableId existingTableId, TableId newTableId) {
|
|||||||
}
|
}
|
||||||
tablesByTableId.remove(existing.id());
|
tablesByTableId.remove(existing.id());
|
||||||
TableImpl updated = new TableImpl(newTableId, existing.columns(),
|
TableImpl updated = new TableImpl(newTableId, existing.columns(),
|
||||||
existing.primaryKeyColumnNames(), existing.defaultCharsetName(), existing.comment());
|
existing.primaryKeyColumnNames(), existing.defaultCharsetName(), existing.comment(), existing.attributes());
|
||||||
try {
|
try {
|
||||||
return tablesByTableId.put(updated.id(), updated);
|
return tablesByTableId.put(updated.id(), updated);
|
||||||
}
|
}
|
||||||
@ -276,7 +278,7 @@ public Table updateTable(TableId tableId, Function<Table, Table> changer) {
|
|||||||
Table updated = changer.apply(existing);
|
Table updated = changer.apply(existing);
|
||||||
if (updated != existing) {
|
if (updated != existing) {
|
||||||
tablesByTableId.put(tableId, new TableImpl(tableId, updated.columns(),
|
tablesByTableId.put(tableId, new TableImpl(tableId, updated.columns(),
|
||||||
updated.primaryKeyColumnNames(), updated.defaultCharsetName(), updated.comment()));
|
updated.primaryKeyColumnNames(), updated.defaultCharsetName(), updated.comment(), updated.attributes()));
|
||||||
}
|
}
|
||||||
changes.add(tableId);
|
changes.add(tableId);
|
||||||
return existing;
|
return existing;
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
import io.debezium.document.Array.Entry;
|
import io.debezium.document.Array.Entry;
|
||||||
import io.debezium.document.Document;
|
import io.debezium.document.Document;
|
||||||
import io.debezium.document.Value;
|
import io.debezium.document.Value;
|
||||||
|
import io.debezium.relational.Attribute;
|
||||||
import io.debezium.relational.Column;
|
import io.debezium.relational.Column;
|
||||||
import io.debezium.relational.ColumnEditor;
|
import io.debezium.relational.ColumnEditor;
|
||||||
import io.debezium.relational.Table;
|
import io.debezium.relational.Table;
|
||||||
@ -50,6 +51,14 @@ public Document toDocument(TableChange tableChange) {
|
|||||||
}
|
}
|
||||||
document.setDocument("table", toDocument(tableChange.getTable()));
|
document.setDocument("table", toDocument(tableChange.getTable()));
|
||||||
document.setString("comment", tableChange.getTable().comment());
|
document.setString("comment", tableChange.getTable().comment());
|
||||||
|
|
||||||
|
List<Document> attributes = tableChange.getTable().attributes()
|
||||||
|
.stream()
|
||||||
|
.map(this::toDocument)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
document.setArray("attributes", Array.create(attributes));
|
||||||
|
|
||||||
return document;
|
return document;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,6 +114,13 @@ private Document toDocument(Column column) {
|
|||||||
return document;
|
return document;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Document toDocument(Attribute attribute) {
|
||||||
|
final Document document = Document.create();
|
||||||
|
document.setString("name", attribute.name());
|
||||||
|
document.setString("value", attribute.value());
|
||||||
|
return document;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
|
public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
|
||||||
TableChanges tableChanges = new TableChanges();
|
TableChanges tableChanges = new TableChanges();
|
||||||
@ -191,6 +207,12 @@ else if (Boolean.TRUE.equals(hasDefaultValue)) {
|
|||||||
})
|
})
|
||||||
.forEach(editor::addColumn);
|
.forEach(editor::addColumn);
|
||||||
|
|
||||||
|
document.getOrCreateArray("attributes")
|
||||||
|
.streamValues()
|
||||||
|
.map(Value::asDocument)
|
||||||
|
.map(v -> Attribute.editor().name(v.getString("name")).value(v.getString("value")).create())
|
||||||
|
.forEach(editor::addAttribute);
|
||||||
|
|
||||||
editor.setPrimaryKeyNames(document.getArray("primaryKeyColumnNames")
|
editor.setPrimaryKeyNames(document.getArray("primaryKeyColumnNames")
|
||||||
.streamValues()
|
.streamValues()
|
||||||
.map(Value::asString)
|
.map(Value::asString)
|
||||||
|
@ -21,12 +21,14 @@ public class TableEditorTest {
|
|||||||
private TableEditor editor;
|
private TableEditor editor;
|
||||||
private Table table;
|
private Table table;
|
||||||
private ColumnEditor columnEditor;
|
private ColumnEditor columnEditor;
|
||||||
|
private AttributeEditor attributeEditor;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeEach() {
|
public void beforeEach() {
|
||||||
editor = Table.editor();
|
editor = Table.editor();
|
||||||
table = null;
|
table = null;
|
||||||
columnEditor = Column.editor();
|
columnEditor = Column.editor();
|
||||||
|
attributeEditor = Attribute.editor();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -36,6 +38,12 @@ public void shouldNotHaveColumnsIfEmpty() {
|
|||||||
assertThat(editor.primaryKeyColumnNames()).isEmpty();
|
assertThat(editor.primaryKeyColumnNames()).isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotHaveAnyAttributesIfEmpty() {
|
||||||
|
assertThat(editor.attributeWithName("any")).isNull();
|
||||||
|
assertThat(editor.attributes()).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
public void shouldFailToCreateTableWhenEditorIsMissingTableId() {
|
public void shouldFailToCreateTableWhenEditorIsMissingTableId() {
|
||||||
editor.create();
|
editor.create();
|
||||||
|
@ -20,6 +20,7 @@ public class TableTest {
|
|||||||
private Column c2;
|
private Column c2;
|
||||||
private Column c3;
|
private Column c3;
|
||||||
private Column c4;
|
private Column c4;
|
||||||
|
private Attribute a1;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeEach() {
|
public void beforeEach() {
|
||||||
@ -44,11 +45,13 @@ public void beforeEach() {
|
|||||||
.optional(true)
|
.optional(true)
|
||||||
.create())
|
.create())
|
||||||
.setPrimaryKeyNames("C1", "C2")
|
.setPrimaryKeyNames("C1", "C2")
|
||||||
|
.addAttribute(Attribute.editor().name("A1").value("12345").create())
|
||||||
.create();
|
.create();
|
||||||
c1 = table.columnWithName("C1");
|
c1 = table.columnWithName("C1");
|
||||||
c2 = table.columnWithName("C2");
|
c2 = table.columnWithName("C2");
|
||||||
c3 = table.columnWithName("C3");
|
c3 = table.columnWithName("C3");
|
||||||
c4 = table.columnWithName("C4");
|
c4 = table.columnWithName("C4");
|
||||||
|
a1 = table.attributeWithName("A1");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -57,6 +60,7 @@ public void checkPreconditions() {
|
|||||||
assertThat(c2).isNotNull();
|
assertThat(c2).isNotNull();
|
||||||
assertThat(c3).isNotNull();
|
assertThat(c3).isNotNull();
|
||||||
assertThat(c4).isNotNull();
|
assertThat(c4).isNotNull();
|
||||||
|
assertThat(a1).isNotNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -67,6 +71,11 @@ public void shouldHaveColumnsWithNames() {
|
|||||||
assertThat(c4.name()).isEqualTo("C4");
|
assertThat(c4.name()).isEqualTo("C4");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldHaveAttributesWithNames() {
|
||||||
|
assertThat(a1.name()).isEqualTo("A1");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldHaveColumnsWithProperPositions() {
|
public void shouldHaveColumnsWithProperPositions() {
|
||||||
assertThat(c1.position()).isEqualTo(1);
|
assertThat(c1.position()).isEqualTo(1);
|
||||||
@ -86,6 +95,11 @@ public void shouldHaveColumns() {
|
|||||||
assertThat(table.columns()).containsExactly(c1, c2, c3, c4);
|
assertThat(table.columns()).containsExactly(c1, c2, c3, c4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldHaveAttributes() {
|
||||||
|
assertThat(table.attributes()).containsExactly(a1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFindColumnsByNameWithExactCase() {
|
public void shouldFindColumnsByNameWithExactCase() {
|
||||||
assertThat(table.columnWithName("C1")).isSameAs(c1);
|
assertThat(table.columnWithName("C1")).isSameAs(c1);
|
||||||
@ -94,6 +108,11 @@ public void shouldFindColumnsByNameWithExactCase() {
|
|||||||
assertThat(table.columnWithName("C4")).isSameAs(c4);
|
assertThat(table.columnWithName("C4")).isSameAs(c4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldFindAttributesByNameWithExactCase() {
|
||||||
|
assertThat(table.attributeWithName("A1")).isSameAs(a1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFindColumnsByNameWithWrongCase() {
|
public void shouldFindColumnsByNameWithWrongCase() {
|
||||||
assertThat(table.columnWithName("c1")).isSameAs(c1);
|
assertThat(table.columnWithName("c1")).isSameAs(c1);
|
||||||
@ -102,12 +121,23 @@ public void shouldFindColumnsByNameWithWrongCase() {
|
|||||||
assertThat(table.columnWithName("c4")).isSameAs(c4);
|
assertThat(table.columnWithName("c4")).isSameAs(c4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldFindAttributesByNameWithWrongCase() {
|
||||||
|
assertThat(table.attributeWithName("a1")).isSameAs(a1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotFindNonExistantColumnsByName() {
|
public void shouldNotFindNonExistantColumnsByName() {
|
||||||
assertThat(table.columnWithName("c1 ")).isNull();
|
assertThat(table.columnWithName("c1 ")).isNull();
|
||||||
assertThat(table.columnWithName("wrong")).isNull();
|
assertThat(table.columnWithName("wrong")).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotFindNonExistentAttributesByName() {
|
||||||
|
assertThat(table.attributeWithName("a1 ")).isNull();
|
||||||
|
assertThat(table.attributeWithName("wrong")).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldHavePrimaryKeyColumns() {
|
public void shouldHavePrimaryKeyColumns() {
|
||||||
assertThat(table.primaryKeyColumnNames()).containsExactly("C1", "C2");
|
assertThat(table.primaryKeyColumnNames()).containsExactly("C1", "C2");
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
import io.debezium.document.Array;
|
import io.debezium.document.Array;
|
||||||
import io.debezium.document.Document;
|
import io.debezium.document.Document;
|
||||||
import io.debezium.document.DocumentReader;
|
import io.debezium.document.DocumentReader;
|
||||||
|
import io.debezium.relational.Attribute;
|
||||||
import io.debezium.relational.Column;
|
import io.debezium.relational.Column;
|
||||||
import io.debezium.relational.Table;
|
import io.debezium.relational.Table;
|
||||||
import io.debezium.relational.TableId;
|
import io.debezium.relational.TableId;
|
||||||
@ -73,6 +74,8 @@ public void canSerializeAndDeserializeHistoryRecord() throws Exception {
|
|||||||
.create())
|
.create())
|
||||||
.setPrimaryKeyNames("first")
|
.setPrimaryKeyNames("first")
|
||||||
.setComment("table comment")
|
.setComment("table comment")
|
||||||
|
.addAttribute(Attribute.editor().name("object_id").value("12345").create())
|
||||||
|
.addAttribute(Attribute.editor().name("other").value("test").create())
|
||||||
.create();
|
.create();
|
||||||
|
|
||||||
TableChanges tableChanges = new TableChanges().create(table);
|
TableChanges tableChanges = new TableChanges().create(table);
|
||||||
|
Loading…
Reference in New Issue
Block a user