DBZ-5044 Use SchemaFactory for ConnectTable & EvenyDispatcher schemas

This commit is contained in:
Anisha Mohanty 2022-07-01 15:54:57 +05:30 committed by Jiri Pechanec
parent 8fb2ffd70a
commit aea7c1ffa6
3 changed files with 106 additions and 17 deletions

View File

@ -13,7 +13,6 @@
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
@ -47,6 +46,7 @@
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.schema.SchemaFactory;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -83,6 +83,8 @@ public class EventDispatcher<P extends Partition, T extends DataCollectionId> im
private final Signal<P> signal;
private IncrementalSnapshotChangeEventSource<P, T> incrementalSnapshotChangeEventSource;
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/**
* Change event receiver for events dispatched from a streaming change event source.
*/
@ -126,19 +128,9 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrateg
this.signal = new Signal<>(connectorConfig, this);
this.heartbeat = heartbeat;
schemaChangeKeySchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeKey"))
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
.build();
schemaChangeValueSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeValue"))
.field(Fields.SOURCE, connectorConfig.getSourceInfoStructMaker().schema())
.field(Fields.TIMESTAMP, Schema.INT64_SCHEMA)
.field(Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA)
.field(Fields.TABLE_CHANGES, SchemaBuilder.array(tableChangesSerializer.getChangeSchema()).build())
.build();
schemaChangeKeySchema = schemaFactoryObject.eventDispatcherKeySchema(schemaNameAdjuster, connectorConfig);
schemaChangeValueSchema = schemaFactoryObject.eventDispatcherValueSchema(schemaNameAdjuster, connectorConfig, tableChangesSerializer);
}
public void dispatchSnapshotEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter,

View File

@ -16,6 +16,7 @@
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.SchemaFactory;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -48,9 +49,11 @@ public class ConnectTableChangeSerializer implements TableChanges.TableChangesSe
public static final String DEFAULT_VALUE_EXPRESSION = "defaultValueExpression";
public static final String ENUM_VALUES = "enumValues";
private final Schema columnSchema;
private final Schema tableSchema;
private final Schema changeSchema;
private Schema columnSchema;
private Schema tableSchema;
private Schema changeSchema;
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
public ConnectTableChangeSerializer(SchemaNameAdjuster schemaNameAdjuster) {
columnSchema = SchemaBuilder.struct()
@ -86,6 +89,12 @@ public ConnectTableChangeSerializer(SchemaNameAdjuster schemaNameAdjuster) {
.field(ID_KEY, Schema.STRING_SCHEMA)
.field(TABLE_KEY, tableSchema)
.build();
columnSchema = schemaFactoryObject.connectTableChangeSerializerColumnSchema(schemaNameAdjuster);
tableSchema = schemaFactoryObject.connectTableChangeSerializerTableSchema(schemaNameAdjuster);
changeSchema = schemaFactoryObject.connectTableChangeSerializerChangeSchema(schemaNameAdjuster);
}
public Schema getChangeSchema() {

View File

@ -8,9 +8,12 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -45,6 +48,29 @@ public class SchemaFactory {
private static final String TRANSACTION_EVENT_COUNT_COLLECTION_SCHEMA_NAME = "event.collection";
private static final int TRANSACTION_EVENT_COUNT_COLLECTION_SCHEMA_VERSION = 1;
/*
* Connect Table schemas
*/
private static final String CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_NAME = "io.debezium.connector.schema.Column";
private static final int CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_VERSION = 1;
private static final String CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_NAME = "io.debezium.connector.schema.Table";
private static final int CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_VERSION = 1;
private static final String CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change";
private static final int CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_VERSION = 1;
/*
* Event Dispatcher schemas
*/
private static final String EVENT_DISPATCHER_SCHEMA_NAME_PREFIX = "io.debezium.connector.";
private static final String EVENT_DISPATCHER_KEY_SCHEMA_NAME_SUFFIX = ".SchemaChangeKey";
private static final int EVENT_DISPATCHER_KEY_SCHEMA_VERSION = 1;
private static final String EVENT_DISPATCHER_VALUE_SCHEMA_NAME_SUFFIX = ".SchemaChangeValue";
private static final int EVENT_DISPATCHER_VALUE_SCHEMA_VERSION = 1;
private static final SchemaFactory schemaFactoryObject = new SchemaFactory();
private SchemaFactory() {
@ -108,4 +134,66 @@ public Schema transactionValueSchema(SchemaNameAdjuster adjuster) {
.field(TransactionMonitor.DEBEZIUM_TRANSACTION_TS_MS, Schema.INT64_SCHEMA)
.build();
}
public Schema connectTableChangeSerializerColumnSchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_NAME))
.version(CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_VERSION)
.field(ConnectTableChangeSerializer.NAME_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.JDBC_TYPE_KEY, Schema.INT32_SCHEMA)
.field(ConnectTableChangeSerializer.NATIVE_TYPE_KEY, Schema.OPTIONAL_INT32_SCHEMA)
.field(ConnectTableChangeSerializer.TYPE_NAME_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.TYPE_EXPRESSION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(ConnectTableChangeSerializer.CHARSET_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(ConnectTableChangeSerializer.LENGTH_KEY, Schema.OPTIONAL_INT32_SCHEMA)
.field(ConnectTableChangeSerializer.SCALE_KEY, Schema.OPTIONAL_INT32_SCHEMA)
.field(ConnectTableChangeSerializer.POSITION_KEY, Schema.INT32_SCHEMA)
.field(ConnectTableChangeSerializer.OPTIONAL_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(ConnectTableChangeSerializer.AUTO_INCREMENTED_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(ConnectTableChangeSerializer.GENERATED_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(ConnectTableChangeSerializer.COMMENT_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
public Schema connectTableChangeSerializerTableSchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_NAME))
.version(CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_VERSION)
.field(ConnectTableChangeSerializer.DEFAULT_CHARSET_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(ConnectTableChangeSerializer.PRIMARY_KEY_COLUMN_NAMES_KEY, SchemaBuilder.array(Schema.STRING_SCHEMA).optional().build())
.field(ConnectTableChangeSerializer.COLUMNS_KEY, SchemaBuilder.array(connectTableChangeSerializerColumnSchema(adjuster)).build())
.field(ConnectTableChangeSerializer.COMMENT_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
public Schema connectTableChangeSerializerChangeSchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_NAME))
.version(CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_VERSION)
.field(ConnectTableChangeSerializer.TYPE_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.ID_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.TABLE_KEY, connectTableChangeSerializerTableSchema(adjuster))
.build();
}
public Schema eventDispatcherKeySchema(SchemaNameAdjuster adjuster, CommonConnectorConfig config) {
return SchemaBuilder.struct()
.name(adjuster.adjust(String.format("%s%s%s", EVENT_DISPATCHER_SCHEMA_NAME_PREFIX, config.getConnectorName(), EVENT_DISPATCHER_KEY_SCHEMA_NAME_SUFFIX)))
.version(EVENT_DISPATCHER_KEY_SCHEMA_VERSION)
.field(HistoryRecord.Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
.build();
}
public Schema eventDispatcherValueSchema(SchemaNameAdjuster adjuster, CommonConnectorConfig config, ConnectTableChangeSerializer serializer) {
return SchemaBuilder.struct()
.name(adjuster.adjust(String.format("%s%s%s", EVENT_DISPATCHER_SCHEMA_NAME_PREFIX, config.getConnectorName(), EVENT_DISPATCHER_VALUE_SCHEMA_NAME_SUFFIX)))
.version(EVENT_DISPATCHER_VALUE_SCHEMA_VERSION)
.field(HistoryRecord.Fields.SOURCE, config.getSourceInfoStructMaker().schema())
.field(HistoryRecord.Fields.TIMESTAMP, Schema.INT64_SCHEMA)
.field(HistoryRecord.Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(HistoryRecord.Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(HistoryRecord.Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA)
.field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(serializer.getChangeSchema()).build())
.build();
}
}