diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index 976652346..139140b5b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -6,10 +6,12 @@ package io.debezium.connector.postgresql; import java.sql.SQLException; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.connect.errors.ConnectException; @@ -44,6 +46,7 @@ public class PostgresChangeRecordEmitter extends RelationalChangeRecordEmitter { private final PostgresConnectorConfig connectorConfig; private final PostgresConnection connection; private final TableId tableId; + private final boolean isJsonPlugin; public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema, PostgresConnection connection, ReplicationMessage message) { super(offset, clock); @@ -54,6 +57,7 @@ public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresCo this.connection = connection; this.tableId = PostgresSchema.parse(message.getTable()); + this.isJsonPlugin = "wal2json".equals(connectorConfig.plugin().getPostgresPluginName()); Objects.requireNonNull(tableId); } @@ -144,31 +148,48 @@ private Object[] columnValues(List columns, TableId t // based on the replication message without toasted columns for now List columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList()); // JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT - Object[] values = new Object[schemaColumns.size()]; + Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()]; + final Set undeliveredToastableColumns = new HashSet<>(schema.getToastableColumnsForTableId(table.id())); for (ReplicationMessage.Column column: columns) { //DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names final String columnName = Strings.unquoteIdentifierPart(column.getName()); - final Column tableColumn = table.columnWithName(columnName); - if (tableColumn == null) { - logger.warn( - "Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", - column.getName()); - continue; - } - int position = tableColumn.position() - 1; - if (position < 0 || position >= values.length) { - logger.warn( - "Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", - column.getName()); - continue; - } - values[position] = column.getValue(() -> (PgConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); - } + undeliveredToastableColumns.remove(columnName); + int position = getPosition(columnName, table, values); + if (position != -1) { + values[position] = column.getValue(() -> (PgConnection) connection.connection(), connectorConfig.includeUnknownDatatypes()); + } + } + if (isJsonPlugin) { + for (String columnName: undeliveredToastableColumns) { + int position = getPosition(columnName, table, values); + if (position != -1) { + values[position] = ToastedReplicationMessageColumn.ToastedValue.TOAST; + } + }; + } return values; } + private int getPosition(String columnName, Table table, Object[] values) { + final Column tableColumn = table.columnWithName(columnName); + + if (tableColumn == null) { + logger.warn( + "Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", + columnName); + return -1; + } + int position = tableColumn.position() - 1; + if (position < 0 || position >= values.length) { + logger.warn( + "Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.", + columnName); + return -1; + } + return position; + } private Optional newTable(TableId tableId) { logger.debug("Schema for table '{}' is missing", tableId); refreshTableFromDatabase(tableId); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java index b54401b55..3370b5b4a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java @@ -52,16 +52,6 @@ public interface MessageDecoder { */ ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder); - /** - * Allows a message decoder to configure optional options that might or might not be present on the server-side LD - * plug-in. So these options will be tried once, and that causes an exception, the connection will be built without - * them. - * - * @param builder the builder to modify - * @return the amended builder instance - */ - ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder); - /** * Signals to this decoder whether messages contain type metadata or not. */ diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index d19953b25..639b3b91b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -331,8 +331,8 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber startL try { s = startPgReplicationStream(startLsn, plugin.forceRds() - ? x -> messageDecoder.optionsWithoutMetadata(messageDecoder.tryOnceOptions(x)) - : x -> messageDecoder.optionsWithMetadata(messageDecoder.tryOnceOptions(x))); + ? messageDecoder::optionsWithoutMetadata + : messageDecoder::optionsWithMetadata); messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true); } catch (PSQLException e) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 189334407..0f87ae856 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -183,11 +183,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu return builder; } - @Override - public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) { - return builder; - } - /** * Callback handler for the 'B' begin replication message. * diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java index e18d8f717..80f51d55b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java @@ -65,9 +65,4 @@ public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuild public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) { return builder; } - - @Override - public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) { - return builder; - } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java index 9d5d0307e..d665cb465 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java @@ -89,11 +89,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu .withSlotOption("include-timestamp", 1); } - @Override - public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) { - return builder.withSlotOption("include-unchanged-toast", 0); - } - @Override public void setContainsMetadata(boolean containsMetadata) { this.containsMetadata = containsMetadata; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java index fdc249d1c..f4df67954 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java @@ -271,11 +271,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu .withSlotOption("include-timestamp", 1); } - @Override - public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) { - return builder.withSlotOption("include-unchanged-toast", 0); - } - @Override public void setContainsMetadata(boolean containsMetadata) { this.containsMetadata = containsMetadata; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java index 12c5de12a..3d4262860 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java @@ -15,6 +15,7 @@ * */ public class DecoderDifferences { + static final String TOASTED_VALUE_PLACEHOLDER = "__DEBEZIUM_TOASTED_VALUE__"; /** * wal2json plugin does not send events for updates on tables that does not define primary key. @@ -68,12 +69,20 @@ public static boolean areSpecialFPValuesUnsupported() { } /** - * wal2json plugin nor pgoutput include toasted column in the update + * wal2json plugin include toasted column in the update * * @author Jiri Pechanec * */ public static boolean areToastedValuesPresentInSchema() { - return !wal2Json() && !pgoutput(); + return !wal2Json(); + } + + public static String optionalToastedValuePlaceholder() { + return TOASTED_VALUE_PLACEHOLDER; + } + + public static String mandatoryToastedValuePlaceholder() { + return TOASTED_VALUE_PLACEHOLDER; } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index e65852336..7c747db10 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -70,8 +70,6 @@ public class RecordsStreamProducerIT extends AbstractRecordsProducerTest { @Rule public TestRule conditionalFail = new ConditionalFail(); - private static final String TOASTED_VALUE_PLACEHOLDER = "__DEBEZIUM_TOASTED_VALUE__"; - @Before public void before() throws Exception { // ensure the slot is deleted for each test @@ -1142,13 +1140,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception { ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER) + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder()) ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER) + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder()) ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), @@ -1157,13 +1155,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception { ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER) + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder()) ), consumer.remove(), Envelope.FieldName.AFTER); assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER), - new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER) + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), + new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder()) ), consumer.remove(), Envelope.FieldName.AFTER); } @@ -1406,7 +1404,7 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg executeAndWait("UPDATE test_table set not_toast = 20"); SourceRecord updatedRecord = consumer.remove(); - if (DecoderDifferences.areToastedValuesPresentInSchema()) { + if (DecoderDifferences.areToastedValuesPresentInSchema() || mode == SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) { assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), @@ -1415,7 +1413,7 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg assertRecordSchemaAndValues(Arrays.asList( new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20), - new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER) + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()) ), updatedRecord, Envelope.FieldName.AFTER); } else {