DBZ-1009 Support for unknown types during snapshot

This commit is contained in:
Jiri Pechanec 2018-11-29 07:01:17 +01:00 committed by Gunnar Morling
parent 2c3d2ee083
commit 68766e9355
5 changed files with 45 additions and 3 deletions

View File

@ -795,4 +795,18 @@ protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn,
protected int getTimePrecision(Column column) {
return column.scale().orElse(-1);
}
/**
* Extracts a value from a PGobject .
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a Kafka Connect type
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBinary(Column column, Field fieldDefn, Object data) {
return super.convertBinary(column, fieldDefn,
(data instanceof PGobject) ? ((PGobject)data).getValue() : data);
}
}

View File

@ -559,7 +559,7 @@ protected List<SchemaAndValueField> schemasAndValuesForNumericTypesUsingStringEn
protected List<SchemaAndValueField> schemasAndValuesForCustomTypes() {
return Arrays.asList(new SchemaAndValueField("lt", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("Top.Collections.Pictures.Astronomy.Galaxies".getBytes())),
new SchemaAndValueField("i", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("0-393-04002-X".getBytes())),
new SchemaAndValueField("i", Schema.BYTES_SCHEMA, ByteBuffer.wrap("0-393-04002-X".getBytes())),
new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null));
}

View File

@ -129,7 +129,7 @@ public void shouldLoadSchemaForExtensionPostgresTypes() throws Exception {
schema.refresh(connection, false);
assertTablesIncluded(TEST_TABLES);
assertTableSchema("public.custom_table", "lt, i",
Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_BYTES_SCHEMA);
Schema.OPTIONAL_BYTES_SCHEMA, Schema.BYTES_SCHEMA);
}
}

View File

@ -31,6 +31,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
/**
* Integration test for {@link RecordsSnapshotProducerIT}
@ -91,6 +92,33 @@ public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
}
}
@Test
public void shouldGenerateSnapshotsForCustomDatatypes() throws Exception {
final PostgresConnectorConfig config = new PostgresConnectorConfig(
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL)
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.build()
);
context = new PostgresTaskContext(
config,
TestHelper.getSchema(config),
PostgresTopicSelector.create(config)
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
final TestConsumer consumer = testConsumer(1, "public");
TestHelper.execute(INSERT_CUSTOM_TYPES_STMT);
//then start the producer and validate all records are there
snapshotProducer.start(consumer, e -> {});
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValuesByTopicName = Collect.hashMapOf("public.custom_table", schemasAndValuesForCustomTypes());
consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName));
}
@Test
public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
// PostGIS must not be used

View File

@ -25,7 +25,7 @@ CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk));
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], PRIMARY KEY(pk));
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], PRIMARY KEY(pk));
CREATE TABLE custom_table (pk serial, lt ltree, i isbn, n TEXT, PRIMARY KEY(pk));
CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, PRIMARY KEY(pk));
CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk));