DBZ-5044 Use SchemaFactory for centralizing Postgres schemas

This commit is contained in:
Anisha Mohanty 2022-07-22 16:08:11 +05:30 committed by Jiri Pechanec
parent cfd0242e28
commit ec792527e9
6 changed files with 150 additions and 71 deletions

View File

@ -12,7 +12,6 @@
import java.util.Base64.Encoder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@ -65,6 +64,8 @@ public class LogicalDecodingMessageMonitor {
private final Schema blockSchema;
private final Schema valueSchema;
private static final PostgresSchemaFactory postgresSchemaFactoryObject = new PostgresSchemaFactory();
public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, BlockingConsumer<SourceRecord> sender) {
this.schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
this.sender = sender;
@ -73,28 +74,15 @@ public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, Bl
this.base64Encoder = Base64.getEncoder();
this.base64UrlSafeEncoder = Base64.getUrlEncoder();
this.keySchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageKey"))
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
this.keySchema = postgresSchemaFactoryObject.logicalDecodingMessageMonitorKeySchema(schemaNameAdjuster);
// pg_logical_emit_message accepts null for prefix and content, but these
// messages are not received actually via logical decoding still marking these
// schemas as optional, just in case we will receive null values for either
// field at some point
this.blockSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.Message"))
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, binaryMode.getSchema().optional().build())
.build();
this.blockSchema = postgresSchemaFactoryObject.logicalDecodingMessageMonitorBlockSchema(schemaNameAdjuster, binaryMode);
this.valueSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageValue"))
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
.field(Envelope.FieldName.SOURCE, connectorConfig.getSourceInfoStructMaker().schema())
.field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, blockSchema)
.build();
this.valueSchema = postgresSchemaFactoryObject.logicalDecodingMessageMonitorValueSchema(schemaNameAdjuster, connectorConfig, binaryMode);
}
public void logicalDecodingMessageEvent(Partition partition, OffsetContext offsetContext, Long timestamp,

View File

@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.data.Ltree;
import io.debezium.data.Envelope;
import io.debezium.schema.SchemaFactory;
import io.debezium.util.SchemaNameAdjuster;
public class PostgresSchemaFactory extends SchemaFactory {
public PostgresSchemaFactory() {
super();
}
private static final PostgresSchemaFactory postgresSchemaFactoryObject = new PostgresSchemaFactory();
public static PostgresSchemaFactory get() {
return postgresSchemaFactoryObject;
}
/*
* Postgres LogicalDecodingMessageMonitor schema
*/
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageKey";
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_VERSION = 1;
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_NAME = "io.debezium.connector.postgresql.Message";
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_VERSION = 1;
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue";
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_VERSION = 1;
public Schema logicalDecodingMessageMonitorKeySchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME))
.version(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_VERSION)
.field(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
public Schema logicalDecodingMessageMonitorBlockSchema(SchemaNameAdjuster adjuster, CommonConnectorConfig.BinaryHandlingMode binaryHandlingMode) {
return SchemaBuilder.struct()
.name(adjuster.adjust(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_NAME))
.version(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_VERSION)
.field(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, binaryHandlingMode.getSchema().optional().build())
.build();
}
public Schema logicalDecodingMessageMonitorValueSchema(SchemaNameAdjuster adjuster, PostgresConnectorConfig config,
CommonConnectorConfig.BinaryHandlingMode binaryHandlingMode) {
return SchemaBuilder.struct()
.name(adjuster.adjust(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME))
.version(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_VERSION)
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
.field(Envelope.FieldName.SOURCE, config.getSourceInfoStructMaker().schema())
.field(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalDecodingMessageMonitorBlockSchema(adjuster, binaryHandlingMode))
.build();
}
public SchemaBuilder datatypeLtreeSchema() {
return SchemaBuilder.string()
.name(Ltree.LOGICAL_NAME)
.version(Ltree.SCHEMA_VERSION);
}
}

View File

@ -8,6 +8,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.connector.postgresql.PostgresSchemaFactory;
/**
* A semantic type for a Ltree string.
*
@ -16,6 +18,9 @@
public class Ltree {
public static final String LOGICAL_NAME = "io.debezium.data.Ltree";
public static int SCHEMA_VERSION = 1;
private static final PostgresSchemaFactory postgresSchemaFactoryObject = new PostgresSchemaFactory();
/**
* Returns a {@link SchemaBuilder} for a Ltree field. You can use the resulting SchemaBuilder
@ -24,9 +29,7 @@ public class Ltree {
* @return the schema builder
*/
public static SchemaBuilder builder() {
return SchemaBuilder.string()
.name(LOGICAL_NAME)
.version(1);
return postgresSchemaFactoryObject.datatypeLtreeSchema();
}
/**

View File

@ -12,11 +12,10 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.schema.SchemaFactory;
/**
* An immutable descriptor for the structure of Debezium message envelopes. An {@link Envelope} can be created for each message
@ -27,6 +26,8 @@
*/
public final class Envelope {
public static final int SCHEMA_VERSION = 1;
/**
* The constants for the values for the {@link FieldName#OPERATION operation} field in the message envelope.
*/
@ -195,53 +196,10 @@ default Builder withSource(Schema sourceSchema) {
Envelope build();
}
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
public static Builder defineSchema() {
return new Builder() {
private final SchemaBuilder builder = SchemaBuilder.struct();
private final Set<String> missingFields = new HashSet<>();
@Override
public Builder withSchema(Schema fieldSchema, String... fieldNames) {
for (String fieldName : fieldNames) {
builder.field(fieldName, fieldSchema);
}
return this;
}
@Override
public Builder withName(String name) {
builder.name(name);
return this;
}
@Override
public Builder withDoc(String doc) {
builder.doc(doc);
return this;
}
@Override
public Envelope build() {
builder.field(FieldName.OPERATION, OPERATION_REQUIRED ? Schema.STRING_SCHEMA : Schema.OPTIONAL_STRING_SCHEMA);
builder.field(FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA);
builder.field(FieldName.TRANSACTION, TransactionMonitor.TRANSACTION_BLOCK_SCHEMA);
checkFieldIsDefined(FieldName.OPERATION);
checkFieldIsDefined(FieldName.BEFORE);
checkFieldIsDefined(FieldName.AFTER);
checkFieldIsDefined(FieldName.SOURCE);
checkFieldIsDefined(FieldName.TRANSACTION);
if (!missingFields.isEmpty()) {
throw new IllegalStateException("The envelope schema is missing field(s) " + String.join(", ", missingFields));
}
return new Envelope(builder.build());
}
private void checkFieldIsDefined(String fieldName) {
if (builder.field(fieldName) == null) {
missingFields.add(fieldName);
}
}
};
return schemaFactoryObject.datatypeEnvelopeSchema();
}
public static Envelope fromSchema(Schema schema) {
@ -250,7 +208,7 @@ public static Envelope fromSchema(Schema schema) {
private final Schema schema;
private Envelope(Schema schema) {
public Envelope(Schema schema) {
this.schema = schema;
}

View File

@ -5,6 +5,9 @@
*/
package io.debezium.schema;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -13,6 +16,7 @@
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.EnumSet;
import io.debezium.data.Envelope;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
@ -77,7 +81,7 @@ public class SchemaFactory {
private static final SchemaFactory schemaFactoryObject = new SchemaFactory();
private SchemaFactory() {
public SchemaFactory() {
}
public static SchemaFactory get() {
@ -251,4 +255,55 @@ public SchemaBuilder datatypeXmlSchema() {
.name(Xml.LOGICAL_NAME)
.version(Xml.SCHEMA_VERSION);
}
public Envelope.Builder datatypeEnvelopeSchema() {
return new Envelope.Builder() {
private final SchemaBuilder builder = SchemaBuilder.struct()
.version(Envelope.SCHEMA_VERSION);
private final Set<String> missingFields = new HashSet<>();
@Override
public Envelope.Builder withSchema(Schema fieldSchema, String... fieldNames) {
for (String fieldName : fieldNames) {
builder.field(fieldName, fieldSchema);
}
return this;
}
@Override
public Envelope.Builder withName(String name) {
builder.name(name);
return this;
}
@Override
public Envelope.Builder withDoc(String doc) {
builder.doc(doc);
return this;
}
@Override
public Envelope build() {
builder.field(Envelope.FieldName.OPERATION, Envelope.OPERATION_REQUIRED ? Schema.STRING_SCHEMA : Schema.OPTIONAL_STRING_SCHEMA);
builder.field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA);
builder.field(Envelope.FieldName.TRANSACTION, transactionBlockSchema());
checkFieldIsDefined(Envelope.FieldName.OPERATION);
checkFieldIsDefined(Envelope.FieldName.BEFORE);
checkFieldIsDefined(Envelope.FieldName.AFTER);
checkFieldIsDefined(Envelope.FieldName.SOURCE);
checkFieldIsDefined(Envelope.FieldName.TRANSACTION);
if (!missingFields.isEmpty()) {
throw new IllegalStateException("The envelope schema is missing field(s) " + String.join(", ", missingFields));
}
return new Envelope(builder.build());
}
private void checkFieldIsDefined(String fieldName) {
if (builder.field(fieldName) == null) {
missingFields.add(fieldName);
}
}
};
}
}

View File

@ -29,7 +29,7 @@ public void shouldBuildWithSimpleOptionalTypesForBeforeAndAfter() {
assertThat(env.schema()).isNotNull();
assertThat(env.schema().name()).isEqualTo("someName");
assertThat(env.schema().doc()).isNull();
assertThat(env.schema().version()).isNull();
assertThat(env.schema().version()).isEqualTo(1);
assertOptionalField(env, Envelope.FieldName.AFTER, Schema.OPTIONAL_STRING_SCHEMA);
assertOptionalField(env, Envelope.FieldName.BEFORE, Schema.OPTIONAL_STRING_SCHEMA);
assertOptionalField(env, Envelope.FieldName.SOURCE, Schema.OPTIONAL_INT64_SCHEMA);