DBZ-6589 Support PostgreSQL coercion for UUID, JSON, and JSONB

This commit is contained in:
Chris Cranford 2023-06-21 17:42:23 -04:00 committed by Jiri Pechanec
parent b96868a75b
commit a32e861a78
33 changed files with 251 additions and 43 deletions

View File

@ -24,6 +24,7 @@
import io.debezium.annotation.Immutable;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig.PrimaryKeyMode;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
@ -145,7 +146,9 @@ public static class FieldDescriptor {
private final Type type;
private final DatabaseDialect dialect;
private final String typeName;
private final String queryBinding;
// Lazily prepared
private String queryBinding;
private FieldDescriptor(Schema schema, String name, boolean key, DatabaseDialect dialect) {
this.schema = schema;
@ -156,7 +159,6 @@ private FieldDescriptor(Schema schema, String name, boolean key, DatabaseDialect
// These are cached here allowing them to be resolved once per record
this.type = dialect.getSchemaType(schema);
this.typeName = type.getTypeName(dialect, schema, key);
this.queryBinding = type.getQueryBinding(schema);
LOGGER.trace("Field [{}] with schema [{}]", name, schema.type());
LOGGER.trace(" Type : {}", type.getClass().getName());
@ -191,7 +193,10 @@ public String getTypeName() {
return typeName;
}
public String getQueryBinding() {
public String getQueryBinding(ColumnDescriptor column) {
if (queryBinding == null) {
queryBinding = type.getQueryBinding(column, schema);
}
return queryBinding;
}

View File

@ -18,6 +18,7 @@
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.connector.jdbc.type.Type;
@ -141,6 +142,16 @@ default String getAlterTableStatementFieldDelimiter() {
*/
String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor record);
/**
* Returns the SQL binding fragment for a column, schema, and type mapping.
*
* @param column the relational column type, never {@code null}
* @param schema the field schema type, never {@code null}
* @param type the resolved field type, never {@code null}
* @return the query binding SQL fragment
*/
String getQueryBindingWithValueCast(ColumnDescriptor column, Schema schema, Type type);
/**
* Gets the maximum length of a VARCHAR field in a primary key column.
*

View File

@ -95,6 +95,7 @@ public class GeneralDatabaseDialect implements DatabaseDialect {
private final TableNamingStrategy tableNamingStrategy;
private final ColumnNamingStrategy columnNamingStrategy;
private final Map<String, Type> typeRegistry = new HashMap<>();
private final Map<String, String> typeCoercions = new HashMap<>();
private final boolean jdbcTimeZone;
public GeneralDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) {
@ -319,7 +320,7 @@ public String getInsertStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append(") VALUES (");
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> record.getFields().get(name).getQueryBinding());
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, table, record));
builder.append(")");
@ -337,11 +338,11 @@ public String getUpdateStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append("UPDATE ");
builder.append(getQualifiedTableName(table.getId()));
builder.append(" SET ");
builder.appendList(", ", record.getNonKeyFieldNames(), (name) -> columnNameEqualsBinding(name, record));
builder.appendList(", ", record.getNonKeyFieldNames(), (name) -> columnNameEqualsBinding(name, table, record));
if (!record.getKeyFieldNames().isEmpty()) {
builder.append(" WHERE ");
builder.appendList(" AND ", record.getKeyFieldNames(), (name) -> columnNameEqualsBinding(name, record));
builder.appendList(" AND ", record.getKeyFieldNames(), (name) -> columnNameEqualsBinding(name, table, record));
}
return builder.build();
@ -355,12 +356,17 @@ public String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor rec
if (!record.getKeyFieldNames().isEmpty()) {
builder.append(" WHERE ");
builder.appendList(" AND ", record.getKeyFieldNames(), (name) -> columnNameEqualsBinding(name, record));
builder.appendList(" AND ", record.getKeyFieldNames(), (name) -> columnNameEqualsBinding(name, table, record));
}
return builder.build();
}
@Override
public String getQueryBindingWithValueCast(ColumnDescriptor column, Schema schema, Type type) {
return "?";
}
@Override
public int bindValue(FieldDescriptor field, NativeQuery<?> query, int startIndex, Object value) {
LOGGER.trace("Bind field '{}' at position {} with type {}: {}", field.getName(), startIndex, field.getType().getClass().getName(), value);
@ -623,8 +629,9 @@ protected void addColumnDefaultValue(FieldDescriptor field, StringBuilder column
}
}
protected String columnQueryBindingFromField(String fieldName, SinkRecordDescriptor record) {
return record.getFields().get(fieldName).getQueryBinding();
protected String columnQueryBindingFromField(String fieldName, TableDescriptor table, SinkRecordDescriptor record) {
final String columnName = columnNameFromField(fieldName, record);
return record.getFields().get(fieldName).getQueryBinding(table.getColumnByName(columnName));
}
protected String columnNameFromField(String fieldName, SinkRecordDescriptor record) {
@ -673,9 +680,10 @@ protected String getQualifiedTableName(TableId tableId) {
return toIdentifier(tableId.getTableName());
}
private String columnNameEqualsBinding(String fieldName, SinkRecordDescriptor record) {
private String columnNameEqualsBinding(String fieldName, TableDescriptor table, SinkRecordDescriptor record) {
final ColumnDescriptor column = table.getColumnByName(columnNameFromField(fieldName, record));
final FieldDescriptor field = record.getFields().get(fieldName);
return toIdentifier(columnNamingStrategy.resolveColumnName(fieldName)) + "=" + field.getQueryBinding();
return toIdentifier(columnNamingStrategy.resolveColumnName(fieldName)) + "=" + field.getQueryBinding(column);
}
private static boolean isColumnNullable(String columnName, Collection<String> primaryKeyColumnNames, int nullability) {

View File

@ -93,7 +93,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append("merge into ");
builder.append(getQualifiedTableName(table.getId()));
builder.append(" using (values(");
builder.appendLists(record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, record));
builder.appendLists(record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, table , record));
builder.append(")) as DAT(");
builder.appendLists(record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnNameFromField(name, record));
builder.append(") on ");

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Json;
@ -27,7 +28,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as json)";
}

View File

@ -15,6 +15,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.connect.AbstractConnectMapType;
@ -31,8 +32,8 @@ class MapToJsonType extends AbstractConnectMapType {
public static final MapToJsonType INSTANCE = new MapToJsonType();
@Override
public String getQueryBinding(Schema schema) {
return JsonType.INSTANCE.getQueryBinding(schema);
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return JsonType.INSTANCE.getQueryBinding(column, schema);
}
@Override

View File

@ -150,7 +150,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append(" (");
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnNameFromField(name, record));
builder.append(") VALUES (");
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, record));
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, table, record));
builder.append(") ");
final List<String> updateColumnNames = record.getNonKeyFieldNames().isEmpty()

View File

@ -98,7 +98,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append(getQualifiedTableName(table.getId()));
builder.append(" USING (SELECT ");
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(),
(name) -> columnQueryBindingFromField(name, record) + " " + columnNameFromField(name, record));
(name) -> columnQueryBindingFromField(name, table, record) + " " + columnNameFromField(name, record));
builder.append(" FROM dual) ").append("INCOMING ON (");
builder.appendList(" AND ", record.getKeyFieldNames(), (name) -> getUpsertIncomingClause(name, table, record));
builder.append(")");

View File

@ -13,6 +13,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Bits;
@ -33,7 +34,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
if (isBitOne(schema)) {
final Optional<String> columnType = getSourceColumnType(schema);
if (columnType.isPresent() && "BIT".equals(columnType.get())) {

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -26,7 +27,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as citext)";
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -26,7 +27,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as cidr)";
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -26,7 +27,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as inet)";
}

View File

@ -9,6 +9,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.MicroDuration;
@ -28,7 +29,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as interval)";
}

View File

@ -9,6 +9,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Json;
@ -28,7 +29,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
if (isHstore(schema)) {
return "cast(? as hstore)";
// return super.getQueryBinding(schema);

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -26,7 +27,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as ltree)";
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -26,7 +27,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as macaddr)";
}

View File

@ -11,6 +11,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.connect.AbstractConnectMapType;
@ -25,7 +26,7 @@ class MapToHstoreType extends AbstractConnectMapType {
public static final MapToHstoreType INSTANCE = new MapToHstoreType();
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as hstore)";
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -26,7 +27,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as money)";
}

View File

@ -10,6 +10,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.geometry.Point;
@ -29,7 +30,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as point)";
}

View File

@ -11,6 +11,7 @@
import java.time.temporal.TemporalAccessor;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.hibernate.SessionFactory;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.PostgreSQLDialect;
@ -21,8 +22,10 @@
import io.debezium.connector.jdbc.dialect.DatabaseDialectProvider;
import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect;
import io.debezium.connector.jdbc.dialect.SqlStatementBuilder;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.connector.jdbc.type.Type;
/**
* A {@link DatabaseDialect} implementation for PostgreSQL.
@ -83,7 +86,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append(" (");
builder.appendLists(",", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnNameFromField(name, record));
builder.append(") VALUES (");
builder.appendLists(",", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, record));
builder.appendLists(",", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, table, record));
builder.append(") ON CONFLICT (");
builder.appendList(",", record.getKeyFieldNames(), (name) -> columnNameFromField(name, record));
if (record.getNonKeyFieldNames().isEmpty()) {
@ -99,6 +102,23 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
return builder.build();
}
@Override
public String getQueryBindingWithValueCast(ColumnDescriptor column, Schema schema, Type type) {
if (schema.type() == Schema.Type.STRING) {
final String typeName = column.getTypeName().toLowerCase();
if ("uuid".equals(typeName)) {
return "cast(? as uuid)";
}
else if ("json".equals(typeName)) {
return "cast(? as json)";
}
else if ("jsonb".equals(typeName)) {
return "cast(? as jsonb)";
}
}
return super.getQueryBindingWithValueCast(column, schema, type);
}
@Override
public String getByteArrayFormat() {
return "'\\x%s'";

View File

@ -9,6 +9,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
@ -28,7 +29,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as " + getSourceColumnType(schema).orElseThrow() + ")";
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Uuid;
@ -27,7 +28,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as uuid)";
}

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Xml;
@ -27,7 +28,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as xml)";
}

View File

@ -13,6 +13,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.util.ByteArrayUtils;
@ -33,7 +34,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
if (Bits.LOGICAL_NAME.equals(schema.name())) {
final int bitSize = Integer.parseInt(schema.parameters().get(Bits.LENGTH_FIELD));
return String.format("cast(? as %s)", bitSize > 1 ? String.format("varbinary(%d)", bitSize) : "bit");

View File

@ -88,7 +88,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
builder.append(getQualifiedTableName(table.getId()));
builder.append(" WITH (HOLDLOCK) AS TARGET USING (SELECT ");
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(),
(name) -> columnNameFromField(name, columnQueryBindingFromField(name, record) + " AS ", record));
(name) -> columnNameFromField(name, columnQueryBindingFromField(name, table, record) + " AS ", record));
builder.append(") AS INCOMING ON (");
builder.appendList(" AND ", record.getKeyFieldNames(), (name) -> {
final String columnName = columnNameFromField(name, record);

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.data.Xml;
@ -27,7 +28,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "cast(? as xml)";
}

View File

@ -14,6 +14,7 @@
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
/**
* An abstract implementation of {@link Type}, which all types should extend.
@ -22,7 +23,6 @@
*/
public abstract class AbstractType implements Type {
private static final String QUERY_BINDING = "?";
private static final String SCHEMA_PARAMETER_COLUMN_TYPE = "__debezium.source.column.type";
private static final String SCHEMA_PARAMETER_COLUMN_SIZE = "__debezium.source.column.length";
private static final String SCHEMA_PARAMETER_COLUMN_PRECISION = "__debezium.source.column.scale";
@ -35,8 +35,8 @@ public void configure(JdbcSinkConnectorConfig config, DatabaseDialect dialect) {
}
@Override
public String getQueryBinding(Schema schema) {
return QUERY_BINDING;
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return getDialect().getQueryBindingWithValueCast(column, schema, this);
}
@Override

View File

@ -10,6 +10,7 @@
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
/**
* A type represents a relational column type used for query abd parameter binding.
@ -52,10 +53,11 @@ public interface Type {
/**
* Return the SQL string to be used in DML statements for binding this type to SQL.
*
* @param column column descriptor in the table relational model, never {@code null}
* @param schema field schema, never {@code null}
* @return query parameter argument binding SQL fragment
*/
String getQueryBinding(Schema schema);
String getQueryBinding(ColumnDescriptor column, Schema schema);
/**
* Resolve the default value clause value.

View File

@ -11,6 +11,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
/**
@ -24,8 +25,8 @@ public class ConnectMapToConnectStringType extends AbstractConnectMapType {
public static final ConnectMapToConnectStringType INSTANCE = new ConnectMapToConnectStringType();
@Override
public String getQueryBinding(Schema schema) {
return ConnectStringType.INSTANCE.getQueryBinding(schema);
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return ConnectStringType.INSTANCE.getQueryBinding(column, schema);
}
@Override

View File

@ -15,6 +15,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractTimeType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.util.DateTimeUtils;
@ -34,7 +35,7 @@ public String[] getRegistrationKeys() {
}
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return getDialect().getTimeQueryBinding();
}

View File

@ -13,6 +13,7 @@
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractTimeType;
/**
@ -23,7 +24,7 @@
public abstract class AbstractDebeziumTimeType extends AbstractTimeType {
@Override
public String getQueryBinding(Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return getDialect().getTimeQueryBinding();
}

View File

@ -0,0 +1,99 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.integration.postgres;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.integration.AbstractJdbcSinkTest;
import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.doc.FixFor;
/**
* Column type mapping tests for PostgreSQL.
*
* @author Chris Cranford
*/
@Tag("all")
@Tag("it")
@Tag("it-postgresql")
@ExtendWith(PostgresSinkDatabaseContextProvider.class)
public class JdbcSinkColumnTypeMappingIT extends AbstractJdbcSinkTest {
public JdbcSinkColumnTypeMappingIT(Sink sink) {
super(sink);
}
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-6589")
public void testShouldCoerceStringTypeToUuidColumnType(SinkRecordFactory factory) throws Exception {
shouldCoerceStringTypeToColumnType(factory, "uuid", "9bc6a215-84b5-4865-a058-9156427c887a", "f54c2926-076a-4db0-846f-14cad99a8307");
}
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-6589")
public void testShouldCoerceStringTypeToJsonColumnType(SinkRecordFactory factory) throws Exception {
shouldCoerceStringTypeToColumnType(factory, "json", "{\"id\": 12345}", "{\"id\": 67890}");
}
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-6589")
public void testShouldCoerceStringTypeToJsonbColumnType(SinkRecordFactory factory) throws Exception {
shouldCoerceStringTypeToColumnType(factory, "jsonb", "{\"id\": 12345}", "{\"id\": 67890}");
}
private void shouldCoerceStringTypeToColumnType(SinkRecordFactory factory, String columnType, String insertValue,
String updateValue) throws Exception {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
properties.put(JdbcSinkConnectorConfig.DELETE_ENABLED, "false");
startSinkConnector(properties);
assertSinkConnectorIsRunning();
final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
final SinkRecord createRecord = factory.createRecordWithSchemaValue(
topicName,
(byte) 1,
"data",
Schema.OPTIONAL_STRING_SCHEMA,
insertValue);
final String destinationTable = destinationTableName(createRecord);
final String sql = "CREATE TABLE %s (id int not null, data %s null, primary key(id))";
getSink().execute(String.format(sql, destinationTable, columnType));
consume(createRecord);
final SinkRecord updateRecord = factory.updateRecordWithSchemaValue(
topicName,
(byte) 1,
"data",
Schema.OPTIONAL_STRING_SCHEMA,
updateValue);
consume(updateRecord);
getSink().assertColumn(destinationTable, "data", columnType);
}
}

View File

@ -193,6 +193,26 @@ default SinkRecord createRecord(String topicName, byte key) {
.build();
}
default SinkRecord createRecordWithSchemaValue(String topicName, byte key, String fieldName, Schema fieldSchema, Object value) {
return SinkRecordBuilder.create()
.flat(isFlattened())
.name("prefix")
.topic(topicName)
.offset(1)
.partition(0)
.keySchema(basicKeySchema())
.recordSchema(SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field(fieldName, fieldSchema)
.build())
.sourceSchema(basicSourceSchema())
.key("id", key)
.after("id", key)
.after(fieldName, value)
.source("ts_ms", (int) Instant.now().getEpochSecond())
.build();
}
default SinkRecord createRecordMultipleKeyColumns(String topicName) {
return SinkRecordBuilder.create()
.flat(isFlattened())
@ -227,6 +247,28 @@ default SinkRecord updateRecord(String topicName) {
.build();
}
default SinkRecord updateRecordWithSchemaValue(String topicName, byte key, String fieldName, Schema fieldSchema, Object value) {
return SinkRecordBuilder.update()
.flat(isFlattened())
.name("prefix")
.topic(topicName)
.offset(1)
.partition(0)
.keySchema(basicKeySchema())
.recordSchema(SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field(fieldName, fieldSchema)
.build())
.sourceSchema(basicSourceSchema())
.key("id", key)
.before("id", key)
.before(fieldName, value)
.after("id", key)
.after(fieldName, value)
.source("ts_ms", (int) Instant.now().getEpochSecond())
.build();
}
default SinkRecord deleteRecord(String topicName) {
return SinkRecordBuilder.delete()
.flat(isFlattened())