DBZ-1292 Clean-up around data schema URI
This commit is contained in:
parent
1a18bdbc81
commit
a861dae432
@ -73,7 +73,6 @@
|
||||
*/
|
||||
public class CloudEventsConverter implements Converter {
|
||||
|
||||
private static final String SCHEMA_URL_PREFIX = "/schemas/ids/";
|
||||
private static final String EXTENSION_NAME_PREFIX = "iodebezium";
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CloudEventsConverter.class);
|
||||
@ -211,20 +210,17 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) {
|
||||
private SchemaAndValue convertToCloudEventsFormatWithDataAsAvro(String topic, RecordParser parser, CloudEventsMaker maker) {
|
||||
Schema dataSchemaType = Schema.BYTES_SCHEMA;
|
||||
byte[] serializedData = avroConverter.fromConnectData(topic, maker.ceDataAttributeSchema(), maker.ceDataAttribute());
|
||||
String dataSchemaUri = maker.ceDataschemaUri(getSchemaIdFromAvroMessage(serializedData));
|
||||
|
||||
// TODO DBZ-1701: the exported path should be configurable, e.g. it could be a
|
||||
// proxy URL for external consumers or even just be omitted altogether
|
||||
String dataSchema = maker.ceDataschema() + SCHEMA_URL_PREFIX + getSchemaIdFromAvroMessage(serializedData);
|
||||
|
||||
return convertToCloudEventsFormat(parser, maker, dataSchemaType, dataSchema, serializedData);
|
||||
return convertToCloudEventsFormat(parser, maker, dataSchemaType, dataSchemaUri, serializedData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains the schema id from the given Avro record. They are prefixed by one magic byte,
|
||||
* followed by an int for the schem id.
|
||||
*/
|
||||
private int getSchemaIdFromAvroMessage(byte[] serializedData) {
|
||||
return ByteBuffer.wrap(serializedData, 1, 5).getInt();
|
||||
private String getSchemaIdFromAvroMessage(byte[] serializedData) {
|
||||
return String.valueOf(ByteBuffer.wrap(serializedData, 1, 5).getInt());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,6 +25,8 @@
|
||||
*/
|
||||
public abstract class CloudEventsMaker {
|
||||
|
||||
private static final String SCHEMA_URL_PATH = "/schemas/ids/";
|
||||
|
||||
/**
|
||||
* The constants for the names of CloudEvents attributes.
|
||||
*/
|
||||
@ -63,7 +65,7 @@ public static final class FieldName {
|
||||
public static final String CLOUDEVENTS_SPECVERSION = "1.0";
|
||||
|
||||
private final SerializerType dataContentType;
|
||||
private final String dataSchemaUrl;
|
||||
private final String dataSchemaUriBase;
|
||||
private final Schema ceDataAttributeSchema;
|
||||
|
||||
RecordParser recordParser;
|
||||
@ -81,16 +83,16 @@ public static final class FieldName {
|
||||
* @param schemaUri the URI of the schema in case of Avro; may be null
|
||||
* @return a concrete CloudEvents maker
|
||||
*/
|
||||
public static CloudEventsMaker create(RecordParser parser, SerializerType contentType, String schemaUri) {
|
||||
public static CloudEventsMaker create(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) {
|
||||
switch (parser.connectorType()) {
|
||||
case "mysql":
|
||||
return new MysqlCloudEventsMaker(parser, contentType, schemaUri);
|
||||
return new MysqlCloudEventsMaker(parser, contentType, dataSchemaUriBase);
|
||||
case "postgresql":
|
||||
return new PostgresCloudEventsMaker(parser, contentType, schemaUri);
|
||||
return new PostgresCloudEventsMaker(parser, contentType, dataSchemaUriBase);
|
||||
case "mongodb":
|
||||
return new MongodbCloudEventsMaker(parser, contentType, schemaUri);
|
||||
return new MongodbCloudEventsMaker(parser, contentType, dataSchemaUriBase);
|
||||
case "sqlserver":
|
||||
return new SqlserverCloudEventsMaker(parser, contentType, schemaUri);
|
||||
return new SqlserverCloudEventsMaker(parser, contentType, dataSchemaUriBase);
|
||||
default:
|
||||
throw new DataException("No usable CloudEvents converters for connector type \"" + parser.connectorType() + "\"");
|
||||
}
|
||||
@ -110,10 +112,10 @@ public static CloudEventsMaker create(RecordParser parser, SerializerType conten
|
||||
return create(parser, contentType, null);
|
||||
}
|
||||
|
||||
private CloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) {
|
||||
private CloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) {
|
||||
this.recordParser = parser;
|
||||
this.dataContentType = contentType;
|
||||
this.dataSchemaUrl = schemaUri;
|
||||
this.dataSchemaUriBase = dataSchemaUriBase;
|
||||
this.ceDataAttributeSchema = getDataSchema(recordParser);
|
||||
}
|
||||
|
||||
@ -180,8 +182,10 @@ public String ceDatacontenttype() {
|
||||
*
|
||||
* @return the data schema url of CloudEvents envelope
|
||||
*/
|
||||
public String ceDataschema() {
|
||||
return dataSchemaUrl;
|
||||
// TODO DBZ-1701: the exported path should be configurable, e.g. it could be a
|
||||
// proxy URL for external consumers or even just be omitted altogether
|
||||
public String ceDataschemaUri(String schemaId) {
|
||||
return dataSchemaUriBase + SCHEMA_URL_PATH + schemaId;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -245,8 +249,8 @@ private static String ceDataAttributeSchemaName(String connectorType) {
|
||||
* CloudEvents maker for records produced by MySQL connector.
|
||||
*/
|
||||
public static final class MysqlCloudEventsMaker extends CloudEventsMaker {
|
||||
MysqlCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) {
|
||||
super(parser, contentType, schemaUri);
|
||||
MysqlCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) {
|
||||
super(parser, contentType, dataSchemaUriBase);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -261,8 +265,8 @@ public String ceId() {
|
||||
* CloudEvents maker for records produced by PostgreSQL connector.
|
||||
*/
|
||||
public static final class PostgresCloudEventsMaker extends CloudEventsMaker {
|
||||
PostgresCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) {
|
||||
super(parser, contentType, schemaUri);
|
||||
PostgresCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) {
|
||||
super(parser, contentType, dataSchemaUriBase);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -277,8 +281,8 @@ public String ceId() {
|
||||
* CloudEvents maker for records produced by MongoDB connector.
|
||||
*/
|
||||
public static final class MongodbCloudEventsMaker extends CloudEventsMaker {
|
||||
MongodbCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) {
|
||||
super(parser, contentType, schemaUri);
|
||||
MongodbCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) {
|
||||
super(parser, contentType, dataSchemaUriBase);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -292,8 +296,8 @@ public String ceId() {
|
||||
* CloudEvents maker for records produced by SQL Server connector.
|
||||
*/
|
||||
public static final class SqlserverCloudEventsMaker extends CloudEventsMaker {
|
||||
SqlserverCloudEventsMaker(RecordParser parser, SerializerType contentType, String schemaUri) {
|
||||
super(parser, contentType, schemaUri);
|
||||
SqlserverCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase) {
|
||||
super(parser, contentType, dataSchemaUriBase);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user