DBZ-1814 Adding tests for exporting unkown column type
This commit is contained in:
parent
c54e377dd2
commit
02b16e17ac
@ -153,6 +153,7 @@ Randall Hauch
|
||||
Raúl Tovar
|
||||
Renato Mefi
|
||||
René Kerner
|
||||
Robert Hana
|
||||
Roman Kuchar
|
||||
Sagar Rao
|
||||
Sahan Dilshan
|
||||
|
@ -956,6 +956,16 @@ protected Object convertBinaryToBytes(Column column, Field fieldDefn, Object dat
|
||||
return super.convertBinaryToBytes(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object data) {
|
||||
return super.convertBinaryToBase64(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data) {
|
||||
return super.convertBinaryToHex(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces toasted value with a placeholder
|
||||
*
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
@ -116,6 +117,7 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest
|
||||
"VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2), '000000110000001000000001'::bit(24)," +
|
||||
"'1000000000000000000000000000000000000000000000000000000000000000'::bit(64))";
|
||||
protected static final String INSERT_BYTEA_BINMODE_STMT = "INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)";
|
||||
protected static final String INSERT_CIRCLE_STMT = "INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)";
|
||||
protected static final String INSERT_GEOM_TYPES_STMT = "INSERT INTO geom_table(p) VALUES ('(1,1)'::point)";
|
||||
protected static final String INSERT_TEXT_TYPES_STMT = "INSERT INTO text_table(j, jb, x, u) " +
|
||||
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +
|
||||
@ -412,6 +414,18 @@ protected List<SchemaAndValueField> schemaAndValueForByteaBase64() {
|
||||
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID"));
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemaAndValueForUnknownColumnBytes() {
|
||||
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("<(10.0,20.0),10.0>".getBytes(StandardCharsets.UTF_8))));
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemaAndValueForUnknownColumnBase64() {
|
||||
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "PCgxMC4wLDIwLjApLDEwLjA+"));
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemaAndValueForUnknownColumnHex() {
|
||||
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "3c2831302e302c32302e30292c31302e303e"));
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemasAndValuesForStringTypesWithSourceColumnTypeInfo() {
|
||||
return Arrays.asList(new SchemaAndValueField("vc",
|
||||
SchemaBuilder.string().optional()
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
|
||||
import io.debezium.data.Bits;
|
||||
@ -981,6 +982,62 @@ public void shouldGenerateSnapshotForByteaAsHexString() throws Exception {
|
||||
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1814")
|
||||
public void shouldGenerateSnapshotForUnknownColumnAsBytes() throws Exception {
|
||||
TestHelper.dropAllSchemas();
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
TestHelper.execute(INSERT_CIRCLE_STMT);
|
||||
|
||||
buildNoStreamProducer(TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
|
||||
|
||||
TestConsumer consumer = testConsumer(1, "public");
|
||||
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
|
||||
|
||||
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnBytes());
|
||||
|
||||
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1814")
|
||||
public void shouldGenerateSnapshotForUnknownColumnAsBase64() throws Exception {
|
||||
TestHelper.dropAllSchemas();
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
TestHelper.execute(INSERT_CIRCLE_STMT);
|
||||
|
||||
buildNoStreamProducer(TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64));
|
||||
|
||||
TestConsumer consumer = testConsumer(1, "public");
|
||||
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
|
||||
|
||||
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnBase64());
|
||||
|
||||
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1814")
|
||||
public void shouldGenerateSnapshotForUnknownColumnAsHex() throws Exception {
|
||||
TestHelper.dropAllSchemas();
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
TestHelper.execute(INSERT_CIRCLE_STMT);
|
||||
|
||||
buildNoStreamProducer(TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.HEX));
|
||||
|
||||
TestConsumer consumer = testConsumer(1, "public");
|
||||
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
|
||||
|
||||
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnHex());
|
||||
|
||||
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
|
||||
}
|
||||
|
||||
private void buildNoStreamProducer(Configuration.Builder config) {
|
||||
start(PostgresConnector.class, config
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.postgresql.PostgresConnectorConfig.IntervalHandlingMode;
|
||||
import io.debezium.connector.postgresql.PostgresConnectorConfig.SchemaRefreshMode;
|
||||
@ -1123,6 +1124,38 @@ public void shouldReceiveByteaHexString() throws Exception {
|
||||
assertInsert(INSERT_BYTEA_BINMODE_STMT, 1, schemaAndValueForByteaHex());
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1814")
|
||||
public void shouldReceiveUnknownTypeAsBytes() throws Exception {
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
|
||||
startConnector(config -> config.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
|
||||
|
||||
assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1814")
|
||||
public void shouldReceiveUnknownTypeAsBase64() throws Exception {
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
|
||||
startConnector(config -> config.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64));
|
||||
|
||||
assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnBase64());
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1814")
|
||||
public void shouldReceiveUnknownTypeAsHex() throws Exception {
|
||||
TestHelper.executeDDL("postgres_create_tables.ddl");
|
||||
|
||||
startConnector(config -> config.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.HEX));
|
||||
|
||||
assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnHex());
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-259")
|
||||
public void shouldProcessIntervalDelete() throws Exception {
|
||||
|
@ -43,6 +43,7 @@ CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk));
|
||||
CREATE TABLE hstore_table_mul (pk serial, hs hstore, hsarr hstore[], PRIMARY KEY(pk));
|
||||
CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk));
|
||||
CREATE TABLE hstore_table_with_special (pk serial, hs hstore, PRIMARY KEY(pk));
|
||||
CREATE TABLE circle_table (pk serial, ccircle circle, PRIMARY KEY(pk));
|
||||
|
||||
CREATE TABLE not_null_table (pk serial,
|
||||
val numeric(20,8), created_at timestamp not null, created_at_tz timestamptz not null, ctime time not null,
|
||||
|
@ -1963,6 +1963,7 @@ NOTE: This setting has impact on snapshots only. Events generated by logical dec
|
||||
|When {prodname} meets a field whose data type is unknown, then by default the field is omitted from the change event and a warning is logged.
|
||||
In some cases it may be preferable though to include the field and send it downstream to clients in the opaque binary representation so the clients will decode it themselves.
|
||||
Set to `false` to filter unknown data out of events and `true` to keep them in binary format.
|
||||
The exact representation can be controlled via the {link-prefix}:{link-postgresql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|
||||
|
||||
NOTE: The clients risk backward compatibility issues. Not only may the database specific binary representation change between releases, but also when the datatype is supported by {prodname} eventually, it will be sent downstream in a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user