diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 916b02110..f244d46cd 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -13,7 +13,6 @@ import java.util.function.Supplier; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.ConnectHeaders; @@ -47,6 +46,7 @@ import io.debezium.schema.SchemaChangeEvent; import io.debezium.spi.schema.DataCollectionId; import io.debezium.spi.topic.TopicNamingStrategy; +import io.debezium.schema.SchemaFactory; import io.debezium.util.SchemaNameAdjuster; /** @@ -83,6 +83,8 @@ public class EventDispatcher

im private final Signal

signal; private IncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource; + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); + /** * Change event receiver for events dispatched from a streaming change event source. */ @@ -126,19 +128,9 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrateg this.signal = new Signal<>(connectorConfig, this); this.heartbeat = heartbeat; - schemaChangeKeySchema = SchemaBuilder.struct() - .name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeKey")) - .field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA) - .build(); - schemaChangeValueSchema = SchemaBuilder.struct() - .name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeValue")) - .field(Fields.SOURCE, connectorConfig.getSourceInfoStructMaker().schema()) - .field(Fields.TIMESTAMP, Schema.INT64_SCHEMA) - .field(Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA) - .field(Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA) - .field(Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA) - .field(Fields.TABLE_CHANGES, SchemaBuilder.array(tableChangesSerializer.getChangeSchema()).build()) - .build(); + schemaChangeKeySchema = schemaFactoryObject.eventDispatcherKeySchema(schemaNameAdjuster, connectorConfig); + + schemaChangeValueSchema = schemaFactoryObject.eventDispatcherValueSchema(schemaNameAdjuster, connectorConfig, tableChangesSerializer); } public void dispatchSnapshotEvent(P partition, T dataCollectionId, ChangeRecordEmitter

changeRecordEmitter, diff --git a/debezium-core/src/main/java/io/debezium/relational/history/ConnectTableChangeSerializer.java b/debezium-core/src/main/java/io/debezium/relational/history/ConnectTableChangeSerializer.java index 347ca5d1f..39f2a1339 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/ConnectTableChangeSerializer.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/ConnectTableChangeSerializer.java @@ -16,6 +16,7 @@ import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.history.TableChanges.TableChange; +import io.debezium.schema.SchemaFactory; import io.debezium.util.SchemaNameAdjuster; /** @@ -48,9 +49,11 @@ public class ConnectTableChangeSerializer implements TableChanges.TableChangesSe public static final String DEFAULT_VALUE_EXPRESSION = "defaultValueExpression"; public static final String ENUM_VALUES = "enumValues"; - private final Schema columnSchema; - private final Schema tableSchema; - private final Schema changeSchema; + private Schema columnSchema; + private Schema tableSchema; + private Schema changeSchema; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); public ConnectTableChangeSerializer(SchemaNameAdjuster schemaNameAdjuster) { columnSchema = SchemaBuilder.struct() @@ -86,6 +89,12 @@ public ConnectTableChangeSerializer(SchemaNameAdjuster schemaNameAdjuster) { .field(ID_KEY, Schema.STRING_SCHEMA) .field(TABLE_KEY, tableSchema) .build(); + + columnSchema = schemaFactoryObject.connectTableChangeSerializerColumnSchema(schemaNameAdjuster); + + tableSchema = schemaFactoryObject.connectTableChangeSerializerTableSchema(schemaNameAdjuster); + + changeSchema = schemaFactoryObject.connectTableChangeSerializerChangeSchema(schemaNameAdjuster); } public Schema getChangeSchema() { diff --git a/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java b/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java index 305fbd3e8..7d3b1fe59 100644 --- a/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java +++ b/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java @@ -8,9 +8,12 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.AbstractSourceInfo; import io.debezium.heartbeat.HeartbeatImpl; import io.debezium.pipeline.txmetadata.TransactionMonitor; +import io.debezium.relational.history.ConnectTableChangeSerializer; +import io.debezium.relational.history.HistoryRecord; import io.debezium.util.SchemaNameAdjuster; /** @@ -45,6 +48,29 @@ public class SchemaFactory { private static final String TRANSACTION_EVENT_COUNT_COLLECTION_SCHEMA_NAME = "event.collection"; private static final int TRANSACTION_EVENT_COUNT_COLLECTION_SCHEMA_VERSION = 1; + /* + * Connect Table schemas + */ + private static final String CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_NAME = "io.debezium.connector.schema.Column"; + private static final int CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_VERSION = 1; + + private static final String CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_NAME = "io.debezium.connector.schema.Table"; + private static final int CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_VERSION = 1; + + private static final String CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change"; + private static final int CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_VERSION = 1; + + /* + * Event Dispatcher schemas + */ + private static final String EVENT_DISPATCHER_SCHEMA_NAME_PREFIX = "io.debezium.connector."; + + private static final String EVENT_DISPATCHER_KEY_SCHEMA_NAME_SUFFIX = ".SchemaChangeKey"; + private static final int EVENT_DISPATCHER_KEY_SCHEMA_VERSION = 1; + + private static final String EVENT_DISPATCHER_VALUE_SCHEMA_NAME_SUFFIX = ".SchemaChangeValue"; + private static final int EVENT_DISPATCHER_VALUE_SCHEMA_VERSION = 1; + private static final SchemaFactory schemaFactoryObject = new SchemaFactory(); private SchemaFactory() { @@ -108,4 +134,66 @@ public Schema transactionValueSchema(SchemaNameAdjuster adjuster) { .field(TransactionMonitor.DEBEZIUM_TRANSACTION_TS_MS, Schema.INT64_SCHEMA) .build(); } + + public Schema connectTableChangeSerializerColumnSchema(SchemaNameAdjuster adjuster) { + return SchemaBuilder.struct() + .name(adjuster.adjust(CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_NAME)) + .version(CONNECT_TABLE_CHANGE_SERIALIZER_COLUMN_SCHEMA_VERSION) + .field(ConnectTableChangeSerializer.NAME_KEY, Schema.STRING_SCHEMA) + .field(ConnectTableChangeSerializer.JDBC_TYPE_KEY, Schema.INT32_SCHEMA) + .field(ConnectTableChangeSerializer.NATIVE_TYPE_KEY, Schema.OPTIONAL_INT32_SCHEMA) + .field(ConnectTableChangeSerializer.TYPE_NAME_KEY, Schema.STRING_SCHEMA) + .field(ConnectTableChangeSerializer.TYPE_EXPRESSION_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(ConnectTableChangeSerializer.CHARSET_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(ConnectTableChangeSerializer.LENGTH_KEY, Schema.OPTIONAL_INT32_SCHEMA) + .field(ConnectTableChangeSerializer.SCALE_KEY, Schema.OPTIONAL_INT32_SCHEMA) + .field(ConnectTableChangeSerializer.POSITION_KEY, Schema.INT32_SCHEMA) + .field(ConnectTableChangeSerializer.OPTIONAL_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field(ConnectTableChangeSerializer.AUTO_INCREMENTED_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field(ConnectTableChangeSerializer.GENERATED_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field(ConnectTableChangeSerializer.COMMENT_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + public Schema connectTableChangeSerializerTableSchema(SchemaNameAdjuster adjuster) { + return SchemaBuilder.struct() + .name(adjuster.adjust(CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_NAME)) + .version(CONNECT_TABLE_CHANGE_SERIALIZER_TABLE_SCHEMA_VERSION) + .field(ConnectTableChangeSerializer.DEFAULT_CHARSET_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(ConnectTableChangeSerializer.PRIMARY_KEY_COLUMN_NAMES_KEY, SchemaBuilder.array(Schema.STRING_SCHEMA).optional().build()) + .field(ConnectTableChangeSerializer.COLUMNS_KEY, SchemaBuilder.array(connectTableChangeSerializerColumnSchema(adjuster)).build()) + .field(ConnectTableChangeSerializer.COMMENT_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + public Schema connectTableChangeSerializerChangeSchema(SchemaNameAdjuster adjuster) { + return SchemaBuilder.struct() + .name(adjuster.adjust(CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_NAME)) + .version(CONNECT_TABLE_CHANGE_SERIALIZER_CHANGE_SCHEMA_VERSION) + .field(ConnectTableChangeSerializer.TYPE_KEY, Schema.STRING_SCHEMA) + .field(ConnectTableChangeSerializer.ID_KEY, Schema.STRING_SCHEMA) + .field(ConnectTableChangeSerializer.TABLE_KEY, connectTableChangeSerializerTableSchema(adjuster)) + .build(); + } + + public Schema eventDispatcherKeySchema(SchemaNameAdjuster adjuster, CommonConnectorConfig config) { + return SchemaBuilder.struct() + .name(adjuster.adjust(String.format("%s%s%s", EVENT_DISPATCHER_SCHEMA_NAME_PREFIX, config.getConnectorName(), EVENT_DISPATCHER_KEY_SCHEMA_NAME_SUFFIX))) + .version(EVENT_DISPATCHER_KEY_SCHEMA_VERSION) + .field(HistoryRecord.Fields.DATABASE_NAME, Schema.STRING_SCHEMA) + .build(); + } + + public Schema eventDispatcherValueSchema(SchemaNameAdjuster adjuster, CommonConnectorConfig config, ConnectTableChangeSerializer serializer) { + return SchemaBuilder.struct() + .name(adjuster.adjust(String.format("%s%s%s", EVENT_DISPATCHER_SCHEMA_NAME_PREFIX, config.getConnectorName(), EVENT_DISPATCHER_VALUE_SCHEMA_NAME_SUFFIX))) + .version(EVENT_DISPATCHER_VALUE_SCHEMA_VERSION) + .field(HistoryRecord.Fields.SOURCE, config.getSourceInfoStructMaker().schema()) + .field(HistoryRecord.Fields.TIMESTAMP, Schema.INT64_SCHEMA) + .field(HistoryRecord.Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field(HistoryRecord.Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field(HistoryRecord.Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA) + .field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(serializer.getChangeSchema()).build()) + .build(); + } }