DBZ-644 Adding test for Postgres
This commit is contained in:
parent
068aa85bd6
commit
b0980b994c
@ -258,6 +258,29 @@ protected List<SchemaAndValueField> schemasAndValuesForStringTypes() {
|
||||
);
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemasAndValuesForStringTypesWithSourceColumnTypeInfo() {
|
||||
return Arrays.asList(new SchemaAndValueField("vc",
|
||||
SchemaBuilder.string().optional()
|
||||
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARCHAR")
|
||||
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "2")
|
||||
.build(),
|
||||
"\u017E\u0161"
|
||||
),
|
||||
new SchemaAndValueField("vcv",
|
||||
SchemaBuilder.string().optional()
|
||||
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARCHAR")
|
||||
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "2")
|
||||
.build(),
|
||||
"bb"
|
||||
),
|
||||
new SchemaAndValueField("ch", Schema.OPTIONAL_STRING_SCHEMA, "cdef"),
|
||||
new SchemaAndValueField("c", Schema.OPTIONAL_STRING_SCHEMA, "abc"),
|
||||
new SchemaAndValueField("t", Schema.OPTIONAL_STRING_SCHEMA, "some text"),
|
||||
new SchemaAndValueField("b", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap(new byte[] {0, 1, 2})),
|
||||
new SchemaAndValueField("bnn", Schema.BYTES_SCHEMA, ByteBuffer.wrap(new byte[] {3, 4, 5}))
|
||||
);
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemasAndValuesForTextTypes() {
|
||||
return Arrays.asList(new SchemaAndValueField("j", Json.builder().optional().build(), "{\"bar\": \"baz\"}"),
|
||||
new SchemaAndValueField("jb", Json.builder().optional().build(), "{\"bar\": \"baz\"}"),
|
||||
|
@ -638,6 +638,22 @@ public void shouldNotStartAfterStop() throws Exception {
|
||||
recordsProducer.start(consumer, blackHole);
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-644")
|
||||
public void shouldPropagateSourceColumnTypeToSchemaParameter() throws Exception {
|
||||
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
||||
.with("column.propagate.source.type", ".*vc.*")
|
||||
.build());
|
||||
setupRecordsProducer(config);
|
||||
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
|
||||
consumer = testConsumer(1);
|
||||
recordsProducer.start(consumer, blackHole);
|
||||
|
||||
assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypesWithSourceColumnTypeInfo());
|
||||
}
|
||||
|
||||
private void setupRecordsProducer(PostgresConnectorConfig config) {
|
||||
PostgresTopicSelector selector = PostgresTopicSelector.create(config);
|
||||
|
||||
|
@ -39,6 +39,16 @@ public final class TestHelper {
|
||||
*/
|
||||
static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision";
|
||||
|
||||
/**
|
||||
* Key for schema parameter used to store a source column's type name.
|
||||
*/
|
||||
static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
|
||||
|
||||
/**
|
||||
* Key for schema parameter used to store a source column's type length.
|
||||
*/
|
||||
static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
|
||||
|
||||
private TestHelper() {
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,8 @@
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
|
||||
import io.debezium.relational.Column;
|
||||
@ -29,7 +31,8 @@ public ValueConverter create(Column column) {
|
||||
|
||||
@Override
|
||||
public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
|
||||
schemaBuilder.parameter(TYPE_NAME_PARAMETER_KEY, column.typeName());
|
||||
// upper-casing type names to be consistent across connectors
|
||||
schemaBuilder.parameter(TYPE_NAME_PARAMETER_KEY, column.typeName().toUpperCase(Locale.ENGLISH));
|
||||
|
||||
if (column.length() != Column.UNSET_INT_VALUE) {
|
||||
schemaBuilder.parameter(TYPE_LENGTH_PARAMETER_KEY, String.valueOf(column.length()));
|
||||
|
Loading…
Reference in New Issue
Block a user