diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index 6c29135b0..976652346 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -144,9 +144,9 @@ private Object[] columnValues(List columns, TableId t // based on the replication message without toasted columns for now List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList()); // JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT - Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()]; + Object[] values = new Object[schemaColumns.size()]; - for (ReplicationMessage.Column column: columnsWithoutToasted) { + for (ReplicationMessage.Column column: columns) { //DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names final String columnName = Strings.unquoteIdentifierPart(column.getName()); final Column tableColumn = table.columnWithName(columnName); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index cc64ad70c..fd0fac9fc 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -42,6 +42,7 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; +import io.debezium.util.Strings; /** * The configuration properties for the {@link PostgresConnector} @@ -747,6 +748,16 @@ public static SchemaRefreshMode parse(String value) { "The default is set to 0 ms, which disables tracking xmin.") .withValidation(Field::isNonNegativeLong); + public static final Field TOASTED_VALUE_PLACEHOLDER = Field.create("toasted.value.placeholder") + .withDisplayName("Toasted value placeholder") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withDefault("__DEBEZIUM_TOASTED_VALUE__") + .withImportance(Importance.MEDIUM) + .withDescription("Specify the constant that will be provided by Debezium to indicate that " + + "the original value is a toasted value not provided by the database." + + "If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets."); + /** * The set of {@link Field}s defined as part of this configuration. */ @@ -764,7 +775,7 @@ public static SchemaRefreshMode parse(String value) { SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, INCLUDE_UNKNOWN_DATATYPES, RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, SCHEMA_REFRESH_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE, - XMIN_FETCH_INTERVAL, SNAPSHOT_MODE_CLASS, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION); + XMIN_FETCH_INTERVAL, TOASTED_VALUE_PLACEHOLDER, SNAPSHOT_MODE_CLASS, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION); private final HStoreHandlingMode hStoreHandlingMode; private final SnapshotMode snapshotMode; @@ -883,6 +894,14 @@ protected Duration xminFetchInterval() { return Duration.ofMillis(getConfig().getLong(PostgresConnectorConfig.XMIN_FETCH_INTERVAL)); } + protected byte[] toastedValuePlaceholder() { + final String placeholder = getConfig().getString(TOASTED_VALUE_PLACEHOLDER); + if (placeholder.startsWith("hex:")) { + Strings.hexStringToByteArray(placeholder.substring(4)); + } + return placeholder.getBytes(); + } + @Override protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { switch (version) { @@ -901,7 +920,8 @@ protected static ConfigDef configDef() { Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST, COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE, Heartbeat.HEARTBEAT_INTERVAL, - Heartbeat.HEARTBEAT_TOPICS_PREFIX, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION); + Heartbeat.HEARTBEAT_TOPICS_PREFIX, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, + TOASTED_VALUE_PLACEHOLDER); Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.SNAPSHOT_DELAY_MS, CommonConnectorConfig.SNAPSHOT_FETCH_SIZE, SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, HSTORE_HANDLING_MODE, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java index 322a74439..30b9300c7 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java @@ -69,8 +69,17 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist } private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, TypeRegistry typeRegistry, Charset databaseCharset) { - PostgresValueConverter valueConverter = new PostgresValueConverter(databaseCharset, config.getDecimalMode(), config.getTemporalPrecisionMode(), - ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry, config.hStoreHandlingMode()); + PostgresValueConverter valueConverter = new PostgresValueConverter( + databaseCharset, + config.getDecimalMode(), + config.getTemporalPrecisionMode(), + ZoneOffset.UTC, + null, + config.includeUnknownDatatypes(), + typeRegistry, + config.hStoreHandlingMode(), + config.toastedValuePlaceholder() + ); return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(LOGGER), config.getSourceInfoStructMaker().schema(), config.getSanitizeFieldNames()); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index a1b01549f..2de734e1e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -123,16 +123,21 @@ public class PostgresValueConverter extends JdbcValueConverters { private final JsonFactory jsonFactory; + private final String toastPlaceholderString; + private final byte[] toastPlaceholderBinary; + protected PostgresValueConverter(Charset databaseCharset, DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry, - HStoreHandlingMode hStoreMode) { + HStoreHandlingMode hStoreMode, byte[] toastPlaceholder) { super(decimalMode, temporalPrecisionMode, defaultOffset, null, bigIntUnsignedMode); this.databaseCharset = databaseCharset; this.jsonFactory = new JsonFactory(); this.includeUnknownDatatypes = includeUnknownDatatypes; this.typeRegistry = typeRegistry; this.hStoreMode = hStoreMode; + this.toastPlaceholderBinary = toastPlaceholder; + this.toastPlaceholderString = new String(toastPlaceholder); } @Override @@ -321,7 +326,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.INT4RANGE_OID: case PgOid.NUM_RANGE_OID: case PgOid.INT8RANGE_OID: - return data -> super.convertString(column, fieldDefn, data); + return data -> convertString(column, fieldDefn, data); case PgOid.POINT: return data -> convertPoint(column, fieldDefn, data); case PgOid.MONEY: @@ -788,9 +793,29 @@ protected int getTimePrecision(Column column) { */ @Override protected Object convertBinary(Column column, Field fieldDefn, Object data) { + if (data instanceof ToastedReplicationMessageColumn.ToastedValue) { + return toastPlaceholderBinary; + } if (data instanceof PgArray) { data = ((PgArray) data).toString(); } return super.convertBinary(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data); } + + /** + * Replaces toasted value with a placeholder + * + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into a Kafka Connect type + * @return the converted value, or null if the conversion could not be made and the column allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls + */ + @Override + protected Object convertString(Column column, Field fieldDefn, Object data) { + if (data instanceof ToastedReplicationMessageColumn.ToastedValue) { + return toastPlaceholderString; + } + return super.convertString(column, fieldDefn, data); + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ToastedReplicationMessageColumn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ToastedReplicationMessageColumn.java index 359d43532..268eca1e0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ToastedReplicationMessageColumn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ToastedReplicationMessageColumn.java @@ -18,6 +18,10 @@ */ public class ToastedReplicationMessageColumn extends AbstractReplicationMessageColumn { + public static enum ToastedValue { + TOAST + }; + public ToastedReplicationMessageColumn(String columnName, PostgresType type, String typeWithModifiers, boolean optional, boolean hasMetadata) { super(columnName, type, typeWithModifiers, optional, hasMetadata); } @@ -29,6 +33,6 @@ public boolean isToastedColumn() { @Override public Object getValue(PostgresStreamingChangeEventSource.PgConnectionSupplier connection, boolean includeUnknownDatatypes) { - throw new UnsupportedOperationException("A toasted column does not supply a value"); + return ToastedValue.TOAST; } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 59abfbaf9..8531d243d 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -1090,6 +1090,8 @@ public void shouldRefreshSchemaOnUnchangedToastedDataWhenSchemaChanged() throws @Test @FixFor("DBZ-842") public void shouldNotPropagateUnchangedToastedData() throws Exception { + final String toastedValuePlaceholder = "__DEBEZIUM_TOASTED_VALUE__"; + startConnector(config -> config .with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) ); @@ -1140,13 +1142,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception { ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "") + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder) ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "") + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder) ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), @@ -1155,13 +1157,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception { ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "") + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder) ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "") + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder) ), consumer.remove(), Envelope.FieldName.AFTER); }