DBZ-1368 Support toasted placeholder in wal2json

This commit is contained in:
Jiri Pechanec 2019-09-16 11:06:19 +02:00 committed by Gunnar Morling
parent 9dd5a16160
commit 29f6d9dd09
9 changed files with 61 additions and 63 deletions

View File

@ -6,10 +6,12 @@
package io.debezium.connector.postgresql;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -44,6 +46,7 @@ public class PostgresChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final PostgresConnectorConfig connectorConfig;
private final PostgresConnection connection;
private final TableId tableId;
private final boolean isJsonPlugin;
public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema, PostgresConnection connection, ReplicationMessage message) {
super(offset, clock);
@ -54,6 +57,7 @@ public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresCo
this.connection = connection;
this.tableId = PostgresSchema.parse(message.getTable());
this.isJsonPlugin = "wal2json".equals(connectorConfig.plugin().getPostgresPluginName());
Objects.requireNonNull(tableId);
}
@ -144,31 +148,48 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
// based on the replication message without toasted columns for now
List<ReplicationMessage.Column> columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList());
// JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT
Object[] values = new Object[schemaColumns.size()];
Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()];
final Set<String> undeliveredToastableColumns = new HashSet<>(schema.getToastableColumnsForTableId(table.id()));
for (ReplicationMessage.Column column: columns) {
//DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names
final String columnName = Strings.unquoteIdentifierPart(column.getName());
undeliveredToastableColumns.remove(columnName);
int position = getPosition(columnName, table, values);
if (position != -1) {
values[position] = column.getValue(() -> (PgConnection) connection.connection(), connectorConfig.includeUnknownDatatypes());
}
}
if (isJsonPlugin) {
for (String columnName: undeliveredToastableColumns) {
int position = getPosition(columnName, table, values);
if (position != -1) {
values[position] = ToastedReplicationMessageColumn.ToastedValue.TOAST;
}
};
}
return values;
}
private int getPosition(String columnName, Table table, Object[] values) {
final Column tableColumn = table.columnWithName(columnName);
if (tableColumn == null) {
logger.warn(
"Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.",
column.getName());
continue;
columnName);
return -1;
}
int position = tableColumn.position() - 1;
if (position < 0 || position >= values.length) {
logger.warn(
"Internal schema is out-of-sync with incoming decoder events; column {} will be omitted from the change event.",
column.getName());
continue;
columnName);
return -1;
}
values[position] = column.getValue(() -> (PgConnection) connection.connection(), connectorConfig.includeUnknownDatatypes());
return position;
}
return values;
}
private Optional<DataCollectionSchema> newTable(TableId tableId) {
logger.debug("Schema for table '{}' is missing", tableId);
refreshTableFromDatabase(tableId);

View File

@ -52,16 +52,6 @@ public interface MessageDecoder {
*/
ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder);
/**
* Allows a message decoder to configure optional options that might or might not be present on the server-side LD
* plug-in. So these options will be tried once, and that causes an exception, the connection will be built without
* them.
*
* @param builder the builder to modify
* @return the amended builder instance
*/
ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder);
/**
* Signals to this decoder whether messages contain type metadata or not.
*/

View File

@ -331,8 +331,8 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber startL
try {
s = startPgReplicationStream(startLsn,
plugin.forceRds()
? x -> messageDecoder.optionsWithoutMetadata(messageDecoder.tryOnceOptions(x))
: x -> messageDecoder.optionsWithMetadata(messageDecoder.tryOnceOptions(x)));
? messageDecoder::optionsWithoutMetadata
: messageDecoder::optionsWithMetadata);
messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
}
catch (PSQLException e) {

View File

@ -183,11 +183,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu
return builder;
}
@Override
public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) {
return builder;
}
/**
* Callback handler for the 'B' begin replication message.
*

View File

@ -65,9 +65,4 @@ public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuild
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
return builder;
}
@Override
public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) {
return builder;
}
}

View File

@ -89,11 +89,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu
.withSlotOption("include-timestamp", 1);
}
@Override
public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) {
return builder.withSlotOption("include-unchanged-toast", 0);
}
@Override
public void setContainsMetadata(boolean containsMetadata) {
this.containsMetadata = containsMetadata;

View File

@ -271,11 +271,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu
.withSlotOption("include-timestamp", 1);
}
@Override
public ChainedLogicalStreamBuilder tryOnceOptions(ChainedLogicalStreamBuilder builder) {
return builder.withSlotOption("include-unchanged-toast", 0);
}
@Override
public void setContainsMetadata(boolean containsMetadata) {
this.containsMetadata = containsMetadata;

View File

@ -15,6 +15,7 @@
*
*/
public class DecoderDifferences {
static final String TOASTED_VALUE_PLACEHOLDER = "__DEBEZIUM_TOASTED_VALUE__";
/**
* wal2json plugin does not send events for updates on tables that does not define primary key.
@ -68,12 +69,20 @@ public static boolean areSpecialFPValuesUnsupported() {
}
/**
* wal2json plugin nor pgoutput include toasted column in the update
* wal2json plugin include toasted column in the update
*
* @author Jiri Pechanec
*
*/
public static boolean areToastedValuesPresentInSchema() {
return !wal2Json() && !pgoutput();
return !wal2Json();
}
public static String optionalToastedValuePlaceholder() {
return TOASTED_VALUE_PLACEHOLDER;
}
public static String mandatoryToastedValuePlaceholder() {
return TOASTED_VALUE_PLACEHOLDER;
}
}

View File

@ -70,8 +70,6 @@ public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
@Rule
public TestRule conditionalFail = new ConditionalFail();
private static final String TOASTED_VALUE_PLACEHOLDER = "__DEBEZIUM_TOASTED_VALUE__";
@Before
public void before() throws Exception {
// ensure the slot is deleted for each test
@ -1142,13 +1140,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception {
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
@ -1157,13 +1155,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception {
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())
), consumer.remove(), Envelope.FieldName.AFTER);
}
@ -1406,7 +1404,7 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
executeAndWait("UPDATE test_table set not_toast = 20");
SourceRecord updatedRecord = consumer.remove();
if (DecoderDifferences.areToastedValuesPresentInSchema()) {
if (DecoderDifferences.areToastedValuesPresentInSchema() || mode == SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) {
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
@ -1415,7 +1413,7 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder())
), updatedRecord, Envelope.FieldName.AFTER);
}
else {