diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java index 8dfdd12d2..01ad8f8df 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java @@ -73,7 +73,6 @@ */ public class CloudEventsConverter implements Converter { - private static final String SCHEMA_URL_PREFIX = "/schemas/ids/"; private static final String EXTENSION_NAME_PREFIX = "iodebezium"; private static final Logger LOGGER = LoggerFactory.getLogger(CloudEventsConverter.class); @@ -211,20 +210,17 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) { private SchemaAndValue convertToCloudEventsFormatWithDataAsAvro(String topic, RecordParser parser, CloudEventsMaker maker) { Schema dataSchemaType = Schema.BYTES_SCHEMA; byte[] serializedData = avroConverter.fromConnectData(topic, maker.ceDataAttributeSchema(), maker.ceDataAttribute()); + String dataSchemaUri = maker.ceDataschemaUri(getSchemaIdFromAvroMessage(serializedData)); - // TODO DBZ-1701: the exported path should be configurable, e.g. it could be a - // proxy URL for external consumers or even just be omitted altogether - String dataSchema = maker.ceDataschema() + SCHEMA_URL_PREFIX + getSchemaIdFromAvroMessage(serializedData); - - return convertToCloudEventsFormat(parser, maker, dataSchemaType, dataSchema, serializedData); + return convertToCloudEventsFormat(parser, maker, dataSchemaType, dataSchemaUri, serializedData); } /** * Obtains the schema id from the given Avro record. They are prefixed by one magic byte, * followed by an int for the schem id. */ - private int getSchemaIdFromAvroMessage(byte[] serializedData) { - return ByteBuffer.wrap(serializedData, 1, 5).getInt(); + private String getSchemaIdFromAvroMessage(byte[] serializedData) { + return String.valueOf(ByteBuffer.wrap(serializedData, 1, 5).getInt()); } @Override diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java index a8b4b2fc1..878907f53 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java @@ -25,6 +25,8 @@ */ public abstract class CloudEventsMaker { + private static final String SCHEMA_URL_PATH = "/schemas/ids/"; + /** * The constants for the names of CloudEvents attributes. */ @@ -63,7 +65,7 @@ public static final class FieldName { public static final String CLOUDEVENTS_SPECVERSION = "1.0"; private final SerializerType dataContentType; - private final String dataSchemaUrl; + private final String dataSchemaUriBase; private final Schema ceDataAttributeSchema; RecordParser recordParser; @@ -81,16 +83,16 @@ public static final class FieldName { * @param schemaUri the URI of the schema in case of Avro; may be null * @return a concrete CloudEvents maker */ - public static CloudEventsMaker create(RecordParser parser, SerializerType contentType, String schemaUri) { + public static CloudEventsMaker create(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) { switch (parser.connectorType()) { case "mysql": - return new MysqlCloudEventsMaker(parser, contentType, schemaUri); + return new MysqlCloudEventsMaker(parser, contentType, dataSchemaUriBase); case "postgresql": - return new PostgresCloudEventsMaker(parser, contentType, schemaUri); + return new PostgresCloudEventsMaker(parser, contentType, dataSchemaUriBase); case "mongodb": - return new MongodbCloudEventsMaker(parser, contentType, schemaUri); + return new MongodbCloudEventsMaker(parser, contentType, dataSchemaUriBase); case "sqlserver": - return new SqlserverCloudEventsMaker(parser, contentType, schemaUri); + return new SqlserverCloudEventsMaker(parser, contentType, dataSchemaUriBase); default: throw new DataException("No usable CloudEvents converters for connector type \"" + parser.connectorType() + "\""); } @@ -110,10 +112,10 @@ public static CloudEventsMaker create(RecordParser parser, SerializerType conten return create(parser, contentType, null); } - private CloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) { + private CloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) { this.recordParser = parser; this.dataContentType = contentType; - this.dataSchemaUrl = schemaUri; + this.dataSchemaUriBase = dataSchemaUriBase; this.ceDataAttributeSchema = getDataSchema(recordParser); } @@ -180,8 +182,10 @@ public String ceDatacontenttype() { * * @return the data schema url of CloudEvents envelope */ - public String ceDataschema() { - return dataSchemaUrl; + // TODO DBZ-1701: the exported path should be configurable, e.g. it could be a + // proxy URL for external consumers or even just be omitted altogether + public String ceDataschemaUri(String schemaId) { + return dataSchemaUriBase + SCHEMA_URL_PATH + schemaId; } /** @@ -245,8 +249,8 @@ private static String ceDataAttributeSchemaName(String connectorType) { * CloudEvents maker for records produced by MySQL connector. */ public static final class MysqlCloudEventsMaker extends CloudEventsMaker { - MysqlCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) { - super(parser, contentType, schemaUri); + MysqlCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) { + super(parser, contentType, dataSchemaUriBase); } @Override @@ -261,8 +265,8 @@ public String ceId() { * CloudEvents maker for records produced by PostgreSQL connector. */ public static final class PostgresCloudEventsMaker extends CloudEventsMaker { - PostgresCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) { - super(parser, contentType, schemaUri); + PostgresCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) { + super(parser, contentType, dataSchemaUriBase); } @Override @@ -277,8 +281,8 @@ public String ceId() { * CloudEvents maker for records produced by MongoDB connector. */ public static final class MongodbCloudEventsMaker extends CloudEventsMaker { - MongodbCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) { - super(parser, contentType, schemaUri); + MongodbCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) { + super(parser, contentType, dataSchemaUriBase); } @Override @@ -292,8 +296,8 @@ public String ceId() { * CloudEvents maker for records produced by SQL Server connector. */ public static final class SqlserverCloudEventsMaker extends CloudEventsMaker { - SqlserverCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) { - super(parser, contentType, schemaUri); + SqlserverCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) { + super(parser, contentType, dataSchemaUriBase); } @Override