DBZ-2363 Using prefix as message key

This commit is contained in:
Gunnar Morling 2021-11-26 15:38:30 +01:00
parent 26140cea26
commit 5866056a41
3 changed files with 37 additions and 10 deletions

View File

@ -50,8 +50,19 @@ public class LogicalDecodingMessageMonitor {
private final String topicName; private final String topicName;
private final BinaryHandlingMode binaryMode; private final BinaryHandlingMode binaryMode;
private final Encoder base64Encoder; private final Encoder base64Encoder;
/**
* The key schema; a struct like this:
* <p>
* {@code
* { "prefix" : "my-prefix" }
* }
* <p>
* Using a struct over the plain prefix as a string for better evolvability down the road.
*/
private final Schema keySchema;
private final Schema blockSchema; private final Schema blockSchema;
private final Schema schema; private final Schema valueSchema;
public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, BlockingConsumer<SourceRecord> sender) { public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, BlockingConsumer<SourceRecord> sender) {
this.schemaNameAdjuster = SchemaNameAdjuster.create(); this.schemaNameAdjuster = SchemaNameAdjuster.create();
@ -60,16 +71,23 @@ public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, Bl
this.binaryMode = connectorConfig.binaryHandlingMode(); this.binaryMode = connectorConfig.binaryHandlingMode();
this.base64Encoder = Base64.getEncoder(); this.base64Encoder = Base64.getEncoder();
this.blockSchema = SchemaBuilder.struct() this.keySchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.Message")) .name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageKey"))
.optional()
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, binaryMode.getSchema().build())
.build(); .build();
this.schema = SchemaBuilder.struct() // pg_logical_emit_message accepts null for prefix and content, but these
// messages are not received actually via logical decoding still marking these
// schemas as optional, just in case we will receive null values for either
// field at some point
this.blockSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.Message"))
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, binaryMode.getSchema().optional().build())
.build();
this.valueSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageValue")) .name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageValue"))
.optional()
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA) .field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
.field(Envelope.FieldName.SOURCE, connectorConfig.getSourceInfoStructMaker().schema()) .field(Envelope.FieldName.SOURCE, connectorConfig.getSourceInfoStructMaker().schema())
@ -84,14 +102,17 @@ public void logicalDecodingMessageEvent(Partition partition, OffsetContext offse
logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, message.getPrefix()); logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, message.getPrefix());
logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, convertContent(message.getContent())); logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, convertContent(message.getContent()));
final Struct value = new Struct(schema); Struct key = new Struct(keySchema);
key.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, message.getPrefix());
final Struct value = new Struct(valueSchema);
value.put(Envelope.FieldName.OPERATION, Envelope.Operation.MESSAGE.code()); value.put(Envelope.FieldName.OPERATION, Envelope.Operation.MESSAGE.code());
value.put(Envelope.FieldName.TIMESTAMP, timestamp); value.put(Envelope.FieldName.TIMESTAMP, timestamp);
value.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalMsgStruct); value.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalMsgStruct);
value.put(Envelope.FieldName.SOURCE, offsetContext.getSourceInfo()); value.put(Envelope.FieldName.SOURCE, offsetContext.getSourceInfo());
sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName,
null, null, null, value.schema(), value)); keySchema, key, value.schema(), value));
if (message.isLastEventForLsn()) { if (message.isLastEventForLsn()) {
offsetContext.getTransactionContext().endTransaction(); offsetContext.getTransactionContext().endTransaction();

View File

@ -247,8 +247,13 @@ public void shouldOnlyConsumeLogicalDecodingMessagesWithIncludedPrefixes() throw
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message")); List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message"));
assertThat(recordsForTopic).hasSize(3); assertThat(recordsForTopic).hasSize(3);
assertThat(((Struct) recordsForTopic.get(0).key()).getString("prefix")).isEqualTo("included_prefix");
assertThat(getPrefix(recordsForTopic.get(0))).isEqualTo("included_prefix"); assertThat(getPrefix(recordsForTopic.get(0))).isEqualTo("included_prefix");
assertThat(((Struct) recordsForTopic.get(1).key()).getString("prefix")).isEqualTo("prefix:included");
assertThat(getPrefix(recordsForTopic.get(1))).isEqualTo("prefix:included"); assertThat(getPrefix(recordsForTopic.get(1))).isEqualTo("prefix:included");
assertThat(((Struct) recordsForTopic.get(2).key()).getString("prefix")).isEqualTo("another_included");
assertThat(getPrefix(recordsForTopic.get(2))).isEqualTo("another_included"); assertThat(getPrefix(recordsForTopic.get(2))).isEqualTo("another_included");
} }

View File

@ -1187,7 +1187,8 @@ when those events are read from different partitions.
This event type is only supported through the `pgoutput` plugin on Postgres 14+ (link:https://www.postgresql.org/docs/14/protocol-logicalrep-message-formats.html[Postgres Documentation]) This event type is only supported through the `pgoutput` plugin on Postgres 14+ (link:https://www.postgresql.org/docs/14/protocol-logicalrep-message-formats.html[Postgres Documentation])
==== ====
A _message_ event signals that a generic logical decoding message has been inserted directly into the WAL typically with the `pg_logical_emit_message` function. The message key is `null` in this case. A _message_ event signals that a generic logical decoding message has been inserted directly into the WAL typically with the `pg_logical_emit_message` function.
The message key is a `Struct` with a single field named `prefix` in this case, carrying the prefix specified when inserting the message.
The message value looks like this for transactional messages: The message value looks like this for transactional messages:
[source,json,indent=0,subs="+attributes"] [source,json,indent=0,subs="+attributes"]